From eb39e0142bda5f35a3e87f02c46a72551b7631d7 Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Mon, 13 May 2024 21:14:13 -0700 Subject: [PATCH 1/2] feat: Feast/IKV upgrade client version Signed-off-by: Pushkar Gupta --- .../contrib/ikv_online_store/ikv.py | 35 +++++++++++-------- setup.py | 2 +- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 9d888aad3d8..73bde8a566e 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -82,9 +82,7 @@ def online_write_batch( progress: Function to be called once a batch of rows is written to the online store, used to show progress. """ - # update should have been called before - if self._writer is None: - return + self._init_writer(config=config) for entity_key, features, event_timestamp, _ in data: entity_id: str = compute_entity_id( @@ -120,6 +118,8 @@ def online_read( item is the event timestamp for the row, and the second item is a dict mapping feature names to values, which are returned in proto format. """ + self._init_reader(config=config) + if not len(entity_keys): return [] @@ -174,7 +174,6 @@ def _decode_fields_for_primary_key( return dt, features - # called before any read/write requests are issued @log_exceptions_and_usage(online_store="ikv") def update( self, @@ -199,8 +198,7 @@ def update( partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so infrastructure corresponding to other feature views should be not be touched. """ - self._init_clients(config=config) - assert self._writer is not None + self._init_writer(config=config) # note: we assume tables_to_keep does not overlap with tables_to_delete @@ -223,8 +221,7 @@ def teardown( tables: Feature views whose corresponding infrastructure should be deleted. entities: Entities whose corresponding infrastructure should be deleted. """ - self._init_clients(config=config) - assert self._writer is not None + self._init_writer(config=config) # drop fields corresponding to this feature-view for feature_view in tables: @@ -269,20 +266,28 @@ def _create_document( return builder.build() - def _init_clients(self, config: RepoConfig): - """Initializes (if required) reader/writer ikv clients.""" - online_config = config.online_store - assert isinstance(online_config, IKVOnlineStoreConfig) - client_options = IKVOnlineStore._config_to_client_options(online_config) - + def _init_writer(self, config: RepoConfig): + """Initializes ikv writer client.""" # initialize writer if self._writer is None: + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + self._writer = create_new_writer(client_options) + self._writer.startup() # blocking operation - # initialize reader, iff mount_dir is specified + def _init_reader(self, config: RepoConfig): + """Initializes ikv reader client.""" + # initialize reader if self._reader is None: + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + if online_config.mount_directory and len(online_config.mount_directory) > 0: self._reader = create_new_reader(client_options) + self._reader.startup() # blocking operation @staticmethod def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: diff --git a/setup.py b/setup.py index 9181e64c2f6..cdab69b6848 100644 --- a/setup.py +++ b/setup.py @@ -127,7 +127,7 @@ ] IKV_REQUIRED = [ - "ikvpy>=0.0.23", + "ikvpy>=0.0.36", ] HAZELCAST_REQUIRED = [ From 718a1ffa7eeaa9304f4964ed6da4dccb58dfbbdf Mon Sep 17 00:00:00 2001 From: Pushkar Gupta Date: Mon, 13 May 2024 21:20:56 -0700 Subject: [PATCH 2/2] linting errors Signed-off-by: Pushkar Gupta --- .../infra/online_stores/contrib/ikv_online_store/ikv.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 73bde8a566e..90df7f46860 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -83,6 +83,7 @@ def online_write_batch( to show progress. """ self._init_writer(config=config) + assert self._writer is not None for entity_key, features, event_timestamp, _ in data: entity_id: str = compute_entity_id( @@ -199,6 +200,7 @@ def update( infrastructure corresponding to other feature views should be not be touched. """ self._init_writer(config=config) + assert self._writer is not None # note: we assume tables_to_keep does not overlap with tables_to_delete @@ -222,6 +224,7 @@ def teardown( entities: Entities whose corresponding infrastructure should be deleted. """ self._init_writer(config=config) + assert self._writer is not None # drop fields corresponding to this feature-view for feature_view in tables: @@ -275,7 +278,7 @@ def _init_writer(self, config: RepoConfig): client_options = IKVOnlineStore._config_to_client_options(online_config) self._writer = create_new_writer(client_options) - self._writer.startup() # blocking operation + self._writer.startup() # blocking operation def _init_reader(self, config: RepoConfig): """Initializes ikv reader client.""" @@ -287,7 +290,7 @@ def _init_reader(self, config: RepoConfig): if online_config.mount_directory and len(online_config.mount_directory) > 0: self._reader = create_new_reader(client_options) - self._reader.startup() # blocking operation + self._reader.startup() # blocking operation @staticmethod def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: