From 3e99ab994ae11c15200d989466b93ca0d82ad697 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 22:43:40 -0700 Subject: [PATCH 1/7] feat: Feast/IKV online store contrib plugin integration Signed-off-by: Pushkar Gupta --- .../contrib/ikv_online_store/__init__.py | 0 .../contrib/ikv_online_store/ikv.py | 274 ++++++++++++++++++ .../feature_repos/repo_configuration.py | 13 + setup.py | 7 +- 4 files changed, 293 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py new file mode 100644 index 00000000000..72641021b2c --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -0,0 +1,274 @@ +from datetime import datetime +from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Literal, + Optional, + Sequence, + Tuple, +) + +from ikvpy.client import IKVReader, IKVWriter +from ikvpy.clientoptions import ClientOptions, ClientOptionsBuilder +from ikvpy.document import IKVDocument, IKVDocumentBuilder +from ikvpy.factory import create_new_reader, create_new_writer +from pydantic import StrictStr + +from feast import Entity, FeatureView, utils +from feast.infra.online_stores.helpers import compute_entity_id +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage + +PRIMARY_KEY_FIELD_NAME: str = "_entity_key" +EVENT_CREATION_TIMESTAMP_FIELD_NAME: str = "_event_timestamp" +CREATION_TIMESTAMP_FIELD_NAME: str = "_created_timestamp" + +class IKVOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for IKV store""" + + type: Literal["ikv"] = "ikv" + """Online store type selector""" + + account_id: StrictStr + """(Required) IKV account id""" + + account_passkey: StrictStr + """(Required) IKV account passkey""" + + store_name: StrictStr + """(Required) IKV store name""" + + mount_directory: Optional[StrictStr] = None + """(Required only for reader) IKV mount point i.e. directory for storing IKV data locally.""" + + +class IKVOnlineStore(OnlineStore): + """ + IKV (inlined.io key value) store implementation of the online store interface. + """ + + # lazy initialization + _reader: Optional[IKVReader] = None + _writer: Optional[IKVWriter] = None + + @log_exceptions_and_usage(online_store="ikv") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Writes a batch of feature rows to the online store. + + If a tz-naive timestamp is passed to this method, it is assumed to be UTC. + + Args: + config: The config for the current feature store. + table: Feature view to which these feature rows correspond. + data: A list of quadruplets containing feature data. Each quadruplet contains an entity + key, a dict containing feature values, an event timestamp for the row, and the created + timestamp for the row if it exists. + progress: Function to be called once a batch of rows is written to the online store, used + to show progress. + """ + # update should have been called before + if self._writer is None: + return + + for entity_key, features, event_timestamp, _ in data: + entity_id: str = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + document: IKVDocument = IKVOnlineStore._create_document(entity_id, table, features, event_timestamp) + self._writer.upsert_fields(document) + if progress: + progress(1) + + @log_exceptions_and_usage(online_store="ikv") + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Reads features values for the given entity keys. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. + + Returns: + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. + """ + if not len(entity_keys): + return [] + + # create IKV primary keys + primary_keys = [compute_entity_id(ek, config.entity_key_serialization_version) for ek in entity_keys] + + # create IKV field names + if requested_features is None: + requested_features = [] + + field_names = [None] * (1 + len(requested_features)) + field_names[0] = EVENT_CREATION_TIMESTAMP_FIELD_NAME + for i, fn in enumerate(requested_features): + field_names[i + 1] = IKVOnlineStore._create_ikv_field_name(table, fn) + + assert self._reader is not None + value_iter = self._reader.multiget_bytes_values( + bytes_primary_keys=[], str_primary_keys=primary_keys, field_names=field_names) + + # decode results + return [IKVOnlineStore._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] + + @staticmethod + def _decode_fields_for_primary_key(requested_features: List[str], + value_iter: Iterator[Optional[bytes]]) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: + + # decode timestamp + dt: Optional[datetime] = None + dt_bytes = next(value_iter) + if dt_bytes: + dt = datetime.fromisoformat(str(dt_bytes, "utf-8")) + + # decode other features + features = {} + for requested_feature in requested_features: + value_proto_bytes: Optional[bytes] = next(value_iter) + if value_proto_bytes: + value_proto = ValueProto() + value_proto.ParseFromString(value_proto_bytes) + features[requested_feature] = value_proto + + return dt, features + + # called before any read/write requests are issued + @log_exceptions_and_usage(online_store="ikv") + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Reconciles cloud resources with the specified set of Feast objects. + + Args: + config: The config for the current feature store. + tables_to_delete: Feature views whose corresponding infrastructure should be deleted. + tables_to_keep: Feature views whose corresponding infrastructure should not be deleted, and + may need to be updated. + entities_to_delete: Entities whose corresponding infrastructure should be deleted. + entities_to_keep: Entities whose corresponding infrastructure should not be deleted, and + may need to be updated. + partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so + infrastructure corresponding to other feature views should be not be touched. + """ + self._init_clients(config=config) + assert self._writer is not None + + # note: we assume tables_to_keep does not overlap with tables_to_delete + + for feature_view in tables_to_delete: + # each field in an IKV document is prefixed by the feature-view's name + self._writer.drop_fields_by_name_prefix([feature_view.name]) + + @log_exceptions_and_usage(online_store="ikv") + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Tears down all cloud resources for the specified set of Feast objects. + + Args: + config: The config for the current feature store. + tables: Feature views whose corresponding infrastructure should be deleted. + entities: Entities whose corresponding infrastructure should be deleted. + """ + self._init_clients(config=config) + assert self._writer is not None + + # drop fields corresponding to this feature-view + for feature_view in tables: + self._writer.drop_fields_by_name_prefix([feature_view.name]) + + # shutdown clients + self._writer.shutdown() + self._writer = None + + if self._reader is not None: + self._reader.shutdown() + self._reader = None + + @staticmethod + def _create_ikv_field_name(feature_view: FeatureView, feature_name: str) -> str: + return "{}_{}".format(feature_view.name, feature_name) + + @staticmethod + def _create_document(entity_id: str, feature_view: FeatureView, \ + values: Dict[str, ValueProto], event_timestamp: datetime) -> IKVDocument: + """ Converts feast key-value pairs into an IKV document. """ + + # initialie builder by inserting primary key and row creation timestamp + event_timestamp_str: str = utils.make_tzaware(event_timestamp).isoformat() + builder = IKVDocumentBuilder()\ + .put_string_field(PRIMARY_KEY_FIELD_NAME, entity_id)\ + .put_bytes_field(EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp_str.encode('utf-8')) + + for feature_name, feature_value in values.items(): + field_name = IKVOnlineStore._create_ikv_field_name(feature_view, feature_name) + builder.put_bytes_field(field_name, feature_value.SerializeToString()) + + return builder.build() + + def _init_clients(self, config: RepoConfig): + """ Initializes (if required) reader/writer ikv clients. """ + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + + # initialize writer + if self._writer is None: + self._writer = create_new_writer(client_options) + + # initialize reader, iff mount_dir is specified + if self._reader is None: + if online_config.mount_directory and len(online_config.mount_directory) > 0: + self._reader = create_new_reader(client_options) + + @staticmethod + def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: + """ Utility for IKVOnlineStoreConfig to IKV ClientOptions conversion. """ + builder = ClientOptionsBuilder()\ + .with_account_id(config.account_id)\ + .with_account_passkey(config.account_passkey)\ + .with_store_name(config.store_name) + + if config.mount_directory and len(config.mount_directory) > 0: + builder = builder.with_mount_directory(config.mount_directory) + + return builder.build() \ No newline at end of file diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index f745bafa132..320ea69e2b5 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -99,6 +99,14 @@ "host": os.getenv("ROCKSET_APISERVER", "api.rs2.usw2.rockset.com"), } +IKV_CONFIG = { + "type": "ikv", + "account_id": os.getenv("IKV_ACCOUNT_ID", ""), + "account_passkey": os.getenv("IKV_ACCOUNT_PASSKEY", ""), + "store_name": os.getenv("IKV_STORE_NAME", ""), + "mount_directory": os.getenv("IKV_MOUNT_DIR", ""), +} + OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, Tuple[str, Type[DataSourceCreator]]] = { "file": ("local", FileDataSourceCreator), "bigquery": ("gcp", BigQueryDataSourceCreator), @@ -137,6 +145,11 @@ # containerized version of Rockset. # AVAILABLE_ONLINE_STORES["rockset"] = (ROCKSET_CONFIG, None) + # Uncomment to test using private IKV account. Currently not enabled as + # there is no dedicated IKV instance for CI testing and there is no + # containerized version of IKV. + # AVAILABLE_OFFLINE_STORES["ikv"] = (IKV_CONFIG, None) + full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) if full_repo_configs_module is not None: diff --git a/setup.py b/setup.py index 2d7bf637789..82a2446607d 100644 --- a/setup.py +++ b/setup.py @@ -131,6 +131,10 @@ "rockset>=1.0.3", ] +IKV_REQUIRED = [ + "ikvpy>=0.0.23", +] + HAZELCAST_REQUIRED = [ "hazelcast-python-client>=5.1", ] @@ -376,7 +380,8 @@ def run(self): "grpcio": GRPCIO_REQUIRED, "rockset": ROCKSET_REQUIRED, "ibis": IBIS_REQUIRED, - "duckdb": DUCKDB_REQUIRED + "duckdb": DUCKDB_REQUIRED, + "ikv": IKV_REQUIRED }, include_package_data=True, license="Apache", From 14a0b3d5e5e65e12b1aae66566ad066fbf0080e5 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 22:54:28 -0700 Subject: [PATCH 2/7] wiring in cli Signed-off-by: Pushkar Gupta --- sdk/python/feast/cli.py | 1 + sdk/python/feast/repo_config.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 7673eee20db..0dac09f2e5e 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -596,6 +596,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "cassandra", "rockset", "hazelcast", + "ikv", ], case_sensitive=False, ), diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5708754622b..cda3d301658 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -63,6 +63,7 @@ "mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore", "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", + "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { @@ -214,6 +215,8 @@ def __init__(self, **data: Any): self.online_config = "dynamodb" elif data["provider"] == "rockset": self.online_config = "rockset" + elif data["provider"] == "ikv": + self.online_config = "ikv" self._batch_engine = None if "batch_engine" in data: From a66ad6dd2cc3c753f3a6414fb3976a879e6d6b09 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 23:01:11 -0700 Subject: [PATCH 3/7] typo Signed-off-by: Pushkar Gupta --- .../tests/integration/feature_repos/repo_configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 320ea69e2b5..ed2d7789e4f 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -148,7 +148,7 @@ # Uncomment to test using private IKV account. Currently not enabled as # there is no dedicated IKV instance for CI testing and there is no # containerized version of IKV. - # AVAILABLE_OFFLINE_STORES["ikv"] = (IKV_CONFIG, None) + # AVAILABLE_ONLINE_STORES["ikv"] = (IKV_CONFIG, None) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) From b83db1d93f12ab2d6c46ecc9540e260c17588ac6 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Wed, 3 Apr 2024 09:12:15 -0700 Subject: [PATCH 4/7] remove conditional on provider Signed-off-by: Pushkar Gupta --- sdk/python/feast/repo_config.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index cda3d301658..648103c9b64 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -215,8 +215,6 @@ def __init__(self, **data: Any): self.online_config = "dynamodb" elif data["provider"] == "rockset": self.online_config = "rockset" - elif data["provider"] == "ikv": - self.online_config = "ikv" self._batch_engine = None if "batch_engine" in data: From c457de3f90402ef533fcac705e533ff2a3d1dbe9 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Wed, 3 Apr 2024 17:23:52 -0700 Subject: [PATCH 5/7] linting annotations Signed-off-by: Pushkar Gupta --- .../feast/infra/online_stores/contrib/ikv_online_store/ikv.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 72641021b2c..65fccf440c2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -127,7 +127,7 @@ def online_read( if requested_features is None: requested_features = [] - field_names = [None] * (1 + len(requested_features)) + field_names: List[Optional[str]] = [None] * (1 + len(requested_features)) field_names[0] = EVENT_CREATION_TIMESTAMP_FIELD_NAME for i, fn in enumerate(requested_features): field_names[i + 1] = IKVOnlineStore._create_ikv_field_name(table, fn) From 330ecd369905e14e5317b75771654483cf1121fa Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Thu, 4 Apr 2024 09:20:04 -0700 Subject: [PATCH 6/7] whitespaces Signed-off-by: Pushkar Gupta --- .../contrib/ikv_online_store/ikv.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 65fccf440c2..d54c3f6acb4 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -92,7 +92,7 @@ def online_write_batch( ) document: IKVDocument = IKVOnlineStore._create_document(entity_id, table, features, event_timestamp) self._writer.upsert_fields(document) - if progress: + if progress: progress(1) @log_exceptions_and_usage(online_store="ikv") @@ -124,9 +124,9 @@ def online_read( primary_keys = [compute_entity_id(ek, config.entity_key_serialization_version) for ek in entity_keys] # create IKV field names - if requested_features is None: + if requested_features is None: requested_features = [] - + field_names: List[Optional[str]] = [None] * (1 + len(requested_features)) field_names[0] = EVENT_CREATION_TIMESTAMP_FIELD_NAME for i, fn in enumerate(requested_features): @@ -140,13 +140,13 @@ def online_read( return [IKVOnlineStore._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] @staticmethod - def _decode_fields_for_primary_key(requested_features: List[str], + def _decode_fields_for_primary_key(requested_features: List[str], value_iter: Iterator[Optional[bytes]]) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: # decode timestamp dt: Optional[datetime] = None dt_bytes = next(value_iter) - if dt_bytes: + if dt_bytes: dt = datetime.fromisoformat(str(dt_bytes, "utf-8")) # decode other features @@ -157,7 +157,7 @@ def _decode_fields_for_primary_key(requested_features: List[str], value_proto = ValueProto() value_proto.ParseFromString(value_proto_bytes) features[requested_feature] = value_proto - + return dt, features # called before any read/write requests are issued @@ -189,7 +189,7 @@ def update( assert self._writer is not None # note: we assume tables_to_keep does not overlap with tables_to_delete - + for feature_view in tables_to_delete: # each field in an IKV document is prefixed by the feature-view's name self._writer.drop_fields_by_name_prefix([feature_view.name]) @@ -215,7 +215,7 @@ def teardown( # drop fields corresponding to this feature-view for feature_view in tables: self._writer.drop_fields_by_name_prefix([feature_view.name]) - + # shutdown clients self._writer.shutdown() self._writer = None @@ -242,7 +242,7 @@ def _create_document(entity_id: str, feature_view: FeatureView, \ for feature_name, feature_value in values.items(): field_name = IKVOnlineStore._create_ikv_field_name(feature_view, feature_name) builder.put_bytes_field(field_name, feature_value.SerializeToString()) - + return builder.build() def _init_clients(self, config: RepoConfig): @@ -254,7 +254,7 @@ def _init_clients(self, config: RepoConfig): # initialize writer if self._writer is None: self._writer = create_new_writer(client_options) - + # initialize reader, iff mount_dir is specified if self._reader is None: if online_config.mount_directory and len(online_config.mount_directory) > 0: @@ -267,8 +267,8 @@ def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: .with_account_id(config.account_id)\ .with_account_passkey(config.account_passkey)\ .with_store_name(config.store_name) - + if config.mount_directory and len(config.mount_directory) > 0: builder = builder.with_mount_directory(config.mount_directory) - return builder.build() \ No newline at end of file + return builder.build() From 1ba0ca04203b81ffa05f7adf77a479c7740de9d9 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Thu, 4 Apr 2024 09:25:28 -0700 Subject: [PATCH 7/7] make format-python Signed-off-by: Pushkar Gupta --- .../contrib/ikv_online_store/ikv.py | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index d54c3f6acb4..9d888aad3d8 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -29,6 +29,7 @@ EVENT_CREATION_TIMESTAMP_FIELD_NAME: str = "_event_timestamp" CREATION_TIMESTAMP_FIELD_NAME: str = "_created_timestamp" + class IKVOnlineStoreConfig(FeastConfigBaseModel): """Online store config for IKV store""" @@ -90,7 +91,9 @@ def online_write_batch( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ) - document: IKVDocument = IKVOnlineStore._create_document(entity_id, table, features, event_timestamp) + document: IKVDocument = IKVOnlineStore._create_document( + entity_id, table, features, event_timestamp + ) self._writer.upsert_fields(document) if progress: progress(1) @@ -121,7 +124,10 @@ def online_read( return [] # create IKV primary keys - primary_keys = [compute_entity_id(ek, config.entity_key_serialization_version) for ek in entity_keys] + primary_keys = [ + compute_entity_id(ek, config.entity_key_serialization_version) + for ek in entity_keys + ] # create IKV field names if requested_features is None: @@ -134,15 +140,23 @@ def online_read( assert self._reader is not None value_iter = self._reader.multiget_bytes_values( - bytes_primary_keys=[], str_primary_keys=primary_keys, field_names=field_names) + bytes_primary_keys=[], + str_primary_keys=primary_keys, + field_names=field_names, + ) # decode results - return [IKVOnlineStore._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] + return [ + IKVOnlineStore._decode_fields_for_primary_key( + requested_features, value_iter + ) + for _ in range(0, len(primary_keys)) + ] @staticmethod - def _decode_fields_for_primary_key(requested_features: List[str], - value_iter: Iterator[Optional[bytes]]) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: - + def _decode_fields_for_primary_key( + requested_features: List[str], value_iter: Iterator[Optional[bytes]] + ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: # decode timestamp dt: Optional[datetime] = None dt_bytes = next(value_iter) @@ -229,24 +243,34 @@ def _create_ikv_field_name(feature_view: FeatureView, feature_name: str) -> str: return "{}_{}".format(feature_view.name, feature_name) @staticmethod - def _create_document(entity_id: str, feature_view: FeatureView, \ - values: Dict[str, ValueProto], event_timestamp: datetime) -> IKVDocument: - """ Converts feast key-value pairs into an IKV document. """ + def _create_document( + entity_id: str, + feature_view: FeatureView, + values: Dict[str, ValueProto], + event_timestamp: datetime, + ) -> IKVDocument: + """Converts feast key-value pairs into an IKV document.""" # initialie builder by inserting primary key and row creation timestamp event_timestamp_str: str = utils.make_tzaware(event_timestamp).isoformat() - builder = IKVDocumentBuilder()\ - .put_string_field(PRIMARY_KEY_FIELD_NAME, entity_id)\ - .put_bytes_field(EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp_str.encode('utf-8')) + builder = ( + IKVDocumentBuilder() + .put_string_field(PRIMARY_KEY_FIELD_NAME, entity_id) + .put_bytes_field( + EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp_str.encode("utf-8") + ) + ) for feature_name, feature_value in values.items(): - field_name = IKVOnlineStore._create_ikv_field_name(feature_view, feature_name) + field_name = IKVOnlineStore._create_ikv_field_name( + feature_view, feature_name + ) builder.put_bytes_field(field_name, feature_value.SerializeToString()) return builder.build() def _init_clients(self, config: RepoConfig): - """ Initializes (if required) reader/writer ikv clients. """ + """Initializes (if required) reader/writer ikv clients.""" online_config = config.online_store assert isinstance(online_config, IKVOnlineStoreConfig) client_options = IKVOnlineStore._config_to_client_options(online_config) @@ -262,11 +286,13 @@ def _init_clients(self, config: RepoConfig): @staticmethod def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: - """ Utility for IKVOnlineStoreConfig to IKV ClientOptions conversion. """ - builder = ClientOptionsBuilder()\ - .with_account_id(config.account_id)\ - .with_account_passkey(config.account_passkey)\ + """Utility for IKVOnlineStoreConfig to IKV ClientOptions conversion.""" + builder = ( + ClientOptionsBuilder() + .with_account_id(config.account_id) + .with_account_passkey(config.account_passkey) .with_store_name(config.store_name) + ) if config.mount_directory and len(config.mount_directory) > 0: builder = builder.with_mount_directory(config.mount_directory)