From 553d5b4c09d238f86e155bf633287bf7df9fd3e9 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 26 Mar 2024 22:11:48 -0700 Subject: [PATCH 01/12] Feast/IKV online store contrib plugin integration --- .../contrib/ikv_online_store/__init__.py | 0 .../contrib/ikv_online_store/ikv.py | 253 ++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py new file mode 100644 index 00000000000..56c3c7c1c4c --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -0,0 +1,253 @@ +from datetime import datetime +from pydantic import StrictStr, Sequence +from typing import Any, Callable, Dict, Iterator, Literal, List, Optional, Sequence, Tuple + +from feast import Entity, FeatureView, utils +from feast.infra.online_stores.helpers import compute_entity_id +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage + +from ikvpy.client import IKVReader, IKVWriter +from ikvpy.document import IKVDocument, IKVDocumentBuilder +from ikvpy.factory import create_new_reader, create_new_writer +from ikvpy.clientoptions import ClientOptions, ClientOptionsBuilder + +PRIMARY_KEY_FIELD_NAME: str = "_entity_key" +EVENT_CREATION_TIMESTAMP_FIELD_NAME: str = "_event_timestamp" +CREATION_TIMESTAMP_FIELD_NAME: str = "_created_timestamp" + +class IKVOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for IKV store""" + + type: Literal["ikv"] = "ikv" + """Online store type selector""" + + account_id: StrictStr + """(Required) IKV account id""" + + account_passkey: StrictStr + """(Required) IKV account passkey""" + + store_name: StrictStr + """(Required) IKV store name""" + + mount_directory: Optional[StrictStr] = None + """(Required only for reader) IKV mount point i.e. directory for storing IKV data locally.""" + + +class IKVOnlineStore(OnlineStore): + """ + IKV (inlined.io key value) store implementation of the online store interface. + """ + + # lazy initialization + _reader: Optional[IKVReader] = None + _writer: Optional[IKVWriter] = None + + @log_exceptions_and_usage(online_store="ikv") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Writes a batch of feature rows to the online store. + + If a tz-naive timestamp is passed to this method, it is assumed to be UTC. + + Args: + config: The config for the current feature store. + table: Feature view to which these feature rows correspond. + data: A list of quadruplets containing feature data. Each quadruplet contains an entity + key, a dict containing feature values, an event timestamp for the row, and the created + timestamp for the row if it exists. + progress: Function to be called once a batch of rows is written to the online store, used + to show progress. + """ + for entity_key, features, event_timestamp, _ in data: + entity_id: str = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + document: IKVDocument = IKVOnlineStore._create_document(entity_id, table, features, event_timestamp) + self._writer.upsert_fields(document) + if progress: + progress(1) + + @log_exceptions_and_usage(online_store="ikv") + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Reads features values for the given entity keys. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. + + Returns: + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. + """ + if not len(entity_keys): + return [] + + # create IKV primary keys + primary_keys = [compute_entity_id(ek, config.entity_key_serialization_version) for ek in entity_keys] + + # create IKV field names + if requested_features is None: + requested_features = [] + + field_names = [None] * (1 + len(requested_features)) + field_names[0] = EVENT_CREATION_TIMESTAMP_FIELD_NAME + for i, fn in enumerate(requested_features): + field_names[i + 1] = IKVOnlineStore._create_ikv_field_name(table, fn) + + value_iter = self._reader.multiget_bytes_values( + bytes_primary_keys=[], str_primary_keys=primary_keys, field_names=field_names) + + # decode results + return [self._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] + + def _decode_fields_for_primary_key(requested_features: List[str], + value_iter: Iterator[Optional[bytes]]) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: + + # decode timestamp + dt: Optional[datetime] = None + dt_bytes = next(value_iter) + if dt_bytes: + dt = datetime.fromisoformat(str(dt_bytes, "utf-8")) + + # decode other features + features = {} + for requested_feature in requested_features: + value_proto_bytes: Optional[bytes] = next(value_iter) + if value_proto_bytes: + value_proto = ValueProto() + value_proto.ParseFromString(value_proto_bytes) + features[requested_feature] = value_proto + + return dt, features + + # called before any read/write requests are issued + @log_exceptions_and_usage(online_store="ikv") + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Reconciles cloud resources with the specified set of Feast objects. + + Args: + config: The config for the current feature store. + tables_to_delete: Feature views whose corresponding infrastructure should be deleted. + tables_to_keep: Feature views whose corresponding infrastructure should not be deleted, and + may need to be updated. + entities_to_delete: Entities whose corresponding infrastructure should be deleted. + entities_to_keep: Entities whose corresponding infrastructure should not be deleted, and + may need to be updated. + partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so + infrastructure corresponding to other feature views should be not be touched. + """ + self._init_clients(config=config) + + # note: we assume tables_to_keep does not overlap with tables_to_delete + + for feature_view in tables_to_delete: + # each field in an IKV document is prefixed by the feature-view's name + self._writer.drop_fields_by_name_prefix([feature_view.name]) + + @log_exceptions_and_usage(online_store="ikv") + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Tears down all cloud resources for the specified set of Feast objects. + + Args: + config: The config for the current feature store. + tables: Feature views whose corresponding infrastructure should be deleted. + entities: Entities whose corresponding infrastructure should be deleted. + """ + self._init_clients(config=config) + + # drop fields corresponding to this feature-view + for feature_view in tables: + self._writer.drop_fields_by_name_prefix([feature_view.name]) + + # shutdown clients + if self._writer is not None: + self._writer.shutdown() + self._writer = None + if self._reader is not None: + self._reader.shutdown() + self._reader = None + + def _create_ikv_field_name(feature_view: FeatureView, feature_name: str) -> str: + return "{}_{}".format(feature_view.name, feature_name) + + def _create_document(entity_id: str, feature_view: FeatureView, \ + values: Dict[str, ValueProto], event_timestamp: datetime) -> IKVDocument: + """ Converts feast key-value pairs into an IKV document. """ + + # initialie builder by inserting primary key and row creation timestamp + event_timestamp: str = utils.make_tzaware(event_timestamp).isoformat() + builder = IKVDocumentBuilder()\ + .put_string_field(PRIMARY_KEY_FIELD_NAME, entity_id)\ + .put_bytes_field(EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp.encode('utf-8')) + + for feature_name, feature_value in values.items(): + field_name = IKVOnlineStore._create_ikv_field_name(feature_view, feature_name) + builder.put_bytes_field(field_name, feature_value.SerializeToString()) + + return builder.build() + + def _init_clients(self, config: RepoConfig): + """ Initializes (if required) reader/writer ikv clients. """ + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + + # initialize writer + if self._writer is None: + self._writer = create_new_writer(client_options) + + # initialize reader, iff mount_dir is specified + if self._reader is None: + if config.mount_directory and len(config.mount_directory) > 0: + self._reader = create_new_reader(client_options) + + def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: + """ Utility for IKVOnlineStoreConfig to IKV ClientOptions conversion. """ + builder = ClientOptionsBuilder()\ + .with_account_id(config.account_id)\ + .with_account_passkey(config.account_passkey)\ + .with_store_name(config.store_name) + + if config.mount_directory and len(config.mount_directory) > 0: + builder = builder.with_mount_directory(config.mount_directory) + + return builder.build() \ No newline at end of file From 2483c7c3ace7233b43aaab39bf0f18320723920e Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Sun, 31 Mar 2024 16:43:27 -0700 Subject: [PATCH 02/12] testing and dependency --- .../integration/feature_repos/repo_configuration.py | 13 +++++++++++++ setup.py | 8 +++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index f745bafa132..320ea69e2b5 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -99,6 +99,14 @@ "host": os.getenv("ROCKSET_APISERVER", "api.rs2.usw2.rockset.com"), } +IKV_CONFIG = { + "type": "ikv", + "account_id": os.getenv("IKV_ACCOUNT_ID", ""), + "account_passkey": os.getenv("IKV_ACCOUNT_PASSKEY", ""), + "store_name": os.getenv("IKV_STORE_NAME", ""), + "mount_directory": os.getenv("IKV_MOUNT_DIR", ""), +} + OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, Tuple[str, Type[DataSourceCreator]]] = { "file": ("local", FileDataSourceCreator), "bigquery": ("gcp", BigQueryDataSourceCreator), @@ -137,6 +145,11 @@ # containerized version of Rockset. # AVAILABLE_ONLINE_STORES["rockset"] = (ROCKSET_CONFIG, None) + # Uncomment to test using private IKV account. Currently not enabled as + # there is no dedicated IKV instance for CI testing and there is no + # containerized version of IKV. + # AVAILABLE_OFFLINE_STORES["ikv"] = (IKV_CONFIG, None) + full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) if full_repo_configs_module is not None: diff --git a/setup.py b/setup.py index 2d7bf637789..939eaa0da37 100644 --- a/setup.py +++ b/setup.py @@ -131,6 +131,10 @@ "rockset>=1.0.3", ] +IKV_REQUIRED = [ + "ikv>=0.0.23", +] + HAZELCAST_REQUIRED = [ "hazelcast-python-client>=5.1", ] @@ -209,6 +213,7 @@ + CASSANDRA_REQUIRED + AZURE_REQUIRED + ROCKSET_REQUIRED + + IKV_REQUIRED + HAZELCAST_REQUIRED + IBIS_REQUIRED + GRPCIO_REQUIRED @@ -376,7 +381,8 @@ def run(self): "grpcio": GRPCIO_REQUIRED, "rockset": ROCKSET_REQUIRED, "ibis": IBIS_REQUIRED, - "duckdb": DUCKDB_REQUIRED + "duckdb": DUCKDB_REQUIRED, + "ikv": IKV_REQUIRED }, include_package_data=True, license="Apache", From d0aff21a23dc45b383e335e39b3f4dac6d4ec5ca Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Sun, 31 Mar 2024 16:47:01 -0700 Subject: [PATCH 03/12] sign off Signed-off-by: Pushkar Gupta From 790c0e5dc97234c40a1cbc953a9846cde80e061a Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Sun, 31 Mar 2024 16:48:07 -0700 Subject: [PATCH 04/12] sign off Signed-off-by: Pushkar Gupta From 75b9609fb5d6a82d2a57eb09867374ed7497a972 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Sun, 31 Mar 2024 16:57:47 -0700 Subject: [PATCH 05/12] correct pkg --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 939eaa0da37..f2725f21f7c 100644 --- a/setup.py +++ b/setup.py @@ -132,7 +132,7 @@ ] IKV_REQUIRED = [ - "ikv>=0.0.23", + "ikvpy>=0.0.23", ] HAZELCAST_REQUIRED = [ From da7705c62f8152147f8e14a34a8ecbf891fcf14b Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Sun, 31 Mar 2024 16:57:51 -0700 Subject: [PATCH 06/12] edit pkg name Signed-off-by: Pushkar Gupta From 7c95550ad619e135e285ef34e5a8c15ecf21f855 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 18:13:59 -0700 Subject: [PATCH 07/12] linting errors --- .../feast/infra/online_stores/contrib/ikv_online_store/ikv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 56c3c7c1c4c..9488a46f14b 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -1,5 +1,5 @@ from datetime import datetime -from pydantic import StrictStr, Sequence +from pydantic import StrictStr from typing import Any, Callable, Dict, Iterator, Literal, List, Optional, Sequence, Tuple from feast import Entity, FeatureView, utils @@ -122,7 +122,7 @@ def online_read( bytes_primary_keys=[], str_primary_keys=primary_keys, field_names=field_names) # decode results - return [self._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] + return [IKVOnlineStore._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] def _decode_fields_for_primary_key(requested_features: List[str], value_iter: Iterator[Optional[bytes]]) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: From d87695ad8739e00df5fd714812ca75fdadee5d9e Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 18:17:41 -0700 Subject: [PATCH 08/12] sign off Signed-off-by: Pushkar Gupta From a17cce27880fdbe488f16b572c29e5f677f40691 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 22:10:10 -0700 Subject: [PATCH 09/12] linting Signed-off-by: Pushkar Gupta --- .../online_stores/contrib/ikv_online_store/ikv.py | 10 +++++++--- setup.py | 1 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 9488a46f14b..1d7023bc2eb 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -124,6 +124,7 @@ def online_read( # decode results return [IKVOnlineStore._decode_fields_for_primary_key(requested_features, value_iter) for _ in range(0, len(primary_keys))] + @staticmethod def _decode_fields_for_primary_key(requested_features: List[str], value_iter: Iterator[Optional[bytes]]) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: @@ -206,18 +207,20 @@ def teardown( self._reader.shutdown() self._reader = None + @staticmethod def _create_ikv_field_name(feature_view: FeatureView, feature_name: str) -> str: return "{}_{}".format(feature_view.name, feature_name) + @staticmethod def _create_document(entity_id: str, feature_view: FeatureView, \ values: Dict[str, ValueProto], event_timestamp: datetime) -> IKVDocument: """ Converts feast key-value pairs into an IKV document. """ # initialie builder by inserting primary key and row creation timestamp - event_timestamp: str = utils.make_tzaware(event_timestamp).isoformat() + event_timestamp_str: str = utils.make_tzaware(event_timestamp).isoformat() builder = IKVDocumentBuilder()\ .put_string_field(PRIMARY_KEY_FIELD_NAME, entity_id)\ - .put_bytes_field(EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp.encode('utf-8')) + .put_bytes_field(EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp_str.encode('utf-8')) for feature_name, feature_value in values.items(): field_name = IKVOnlineStore._create_ikv_field_name(feature_view, feature_name) @@ -237,9 +240,10 @@ def _init_clients(self, config: RepoConfig): # initialize reader, iff mount_dir is specified if self._reader is None: - if config.mount_directory and len(config.mount_directory) > 0: + if online_config.mount_directory and len(online_config.mount_directory) > 0: self._reader = create_new_reader(client_options) + @staticmethod def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: """ Utility for IKVOnlineStoreConfig to IKV ClientOptions conversion. """ builder = ClientOptionsBuilder()\ diff --git a/setup.py b/setup.py index f2725f21f7c..82a2446607d 100644 --- a/setup.py +++ b/setup.py @@ -213,7 +213,6 @@ + CASSANDRA_REQUIRED + AZURE_REQUIRED + ROCKSET_REQUIRED - + IKV_REQUIRED + HAZELCAST_REQUIRED + IBIS_REQUIRED + GRPCIO_REQUIRED From ce44ff878fee601d92ba38f82c9a96deea3cbd80 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 22:17:45 -0700 Subject: [PATCH 10/12] linting Signed-off-by: Pushkar Gupta --- .../online_stores/contrib/ikv_online_store/ikv.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 1d7023bc2eb..56c2d28c830 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -71,6 +71,10 @@ def online_write_batch( progress: Function to be called once a batch of rows is written to the online store, used to show progress. """ + # update should have been called before + if self._writer is None: + return + for entity_key, features, event_timestamp, _ in data: entity_id: str = compute_entity_id( entity_key, @@ -118,6 +122,7 @@ def online_read( for i, fn in enumerate(requested_features): field_names[i + 1] = IKVOnlineStore._create_ikv_field_name(table, fn) + assert self._reader is not None value_iter = self._reader.multiget_bytes_values( bytes_primary_keys=[], str_primary_keys=primary_keys, field_names=field_names) @@ -171,6 +176,7 @@ def update( infrastructure corresponding to other feature views should be not be touched. """ self._init_clients(config=config) + assert self._writer is not None # note: we assume tables_to_keep does not overlap with tables_to_delete @@ -194,15 +200,16 @@ def teardown( entities: Entities whose corresponding infrastructure should be deleted. """ self._init_clients(config=config) + assert self._writer is not None # drop fields corresponding to this feature-view for feature_view in tables: self._writer.drop_fields_by_name_prefix([feature_view.name]) # shutdown clients - if self._writer is not None: - self._writer.shutdown() - self._writer = None + self._writer.shutdown() + self._writer = None + if self._reader is not None: self._reader.shutdown() self._reader = None From 0bd3becf85f95ad580681dbd5ffb74f6239ce160 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 22:18:16 -0700 Subject: [PATCH 11/12] linting Signed-off-by: Pushkar Gupta --- .../feast/infra/online_stores/contrib/ikv_online_store/ikv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 56c2d28c830..ff4ccd184e4 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -74,7 +74,7 @@ def online_write_batch( # update should have been called before if self._writer is None: return - + for entity_key, features, event_timestamp, _ in data: entity_id: str = compute_entity_id( entity_key, @@ -209,7 +209,7 @@ def teardown( # shutdown clients self._writer.shutdown() self._writer = None - + if self._reader is not None: self._reader.shutdown() self._reader = None From 872be0b172bb35b4049a54931b6acebe16659c69 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Tue, 2 Apr 2024 22:20:09 -0700 Subject: [PATCH 12/12] make format Signed-off-by: Pushkar Gupta --- .../contrib/ikv_online_store/ikv.py | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index ff4ccd184e4..72641021b2c 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -1,6 +1,21 @@ from datetime import datetime +from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Literal, + Optional, + Sequence, + Tuple, +) + +from ikvpy.client import IKVReader, IKVWriter +from ikvpy.clientoptions import ClientOptions, ClientOptionsBuilder +from ikvpy.document import IKVDocument, IKVDocumentBuilder +from ikvpy.factory import create_new_reader, create_new_writer from pydantic import StrictStr -from typing import Any, Callable, Dict, Iterator, Literal, List, Optional, Sequence, Tuple from feast import Entity, FeatureView, utils from feast.infra.online_stores.helpers import compute_entity_id @@ -10,11 +25,6 @@ from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.usage import log_exceptions_and_usage -from ikvpy.client import IKVReader, IKVWriter -from ikvpy.document import IKVDocument, IKVDocumentBuilder -from ikvpy.factory import create_new_reader, create_new_writer -from ikvpy.clientoptions import ClientOptions, ClientOptionsBuilder - PRIMARY_KEY_FIELD_NAME: str = "_entity_key" EVENT_CREATION_TIMESTAMP_FIELD_NAME: str = "_event_timestamp" CREATION_TIMESTAMP_FIELD_NAME: str = "_created_timestamp"