From 1752a4fc72f72ac5e7c059f028940e656db5b50b Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Wed, 15 Oct 2025 14:48:36 +0300 Subject: [PATCH 1/2] add pull_all_from_table_or_query for clickhouse, to align with new materialization logic (calling it) Signed-off-by: lukas.valatka --- .../clickhouse_offline_store/clickhouse.py | 37 +++++++++++++++++++ .../registration/test_universal_types.py | 1 - 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py index bca6339fb15..5e8cf3d9053 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py @@ -191,6 +191,43 @@ def pull_latest_from_table_or_query( on_demand_feature_views=None, ) + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) + assert isinstance(data_source, ClickhouseSource) + + from_expression = data_source.get_table_query_string() + + timestamp_fields = [timestamp_field] + + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) + + field_string = ", ".join( + join_key_columns + feature_name_columns + timestamp_fields + ) + + query = f""" + SELECT {field_string} + FROM {from_expression} + WHERE {timestamp_field} BETWEEN parseDateTimeBestEffort('{start_date}') AND parseDateTimeBestEffort('{end_date}') + """ + + return ClickhouseRetrievalJob( + query=query, + config=config, + full_feature_names=False, + ) + class ClickhouseRetrievalJob(PostgreSQLRetrievalJob): def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 5ba99b9d7f1..b464cf2f766 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -343,7 +343,6 @@ def offline_types_test_fixtures(request, environment): if ( environment.data_source_creator.__class__.__name__ == "ClickhouseDataSourceCreator" - and config.feature_dtype in {"float", "datetime", "bool"} and config.feature_is_list and not config.has_empty_list ): From 460ecdd2fc94972603c4b40e2ea5e7ccd3799947 Mon Sep 17 00:00:00 2001 From: "lukas.valatka" Date: Tue, 11 Nov 2025 17:35:52 +0200 Subject: [PATCH 2/2] fix - ensure on demand feature views can be checked too Signed-off-by: lukas.valatka --- sdk/python/feast/feature_server.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index fee7e56e9c1..e88b1eb5c28 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -344,18 +344,27 @@ async def push(request: PushFeaturesRequest) -> None: async def _get_feast_object( feature_view_name: str, allow_registry_cache: bool ) -> FeastObject: + # FIXME: this logic repeated at least 3 times in the codebase - should be centralized + # in logging, in server and in feature_store (Python SDK) try: - return await run_in_threadpool( - store.get_stream_feature_view, - feature_view_name, - allow_registry_cache=allow_registry_cache, - ) - except FeatureViewNotFoundException: return await run_in_threadpool( store.get_feature_view, feature_view_name, allow_registry_cache=allow_registry_cache, ) + except FeatureViewNotFoundException: + try: + return await run_in_threadpool( + store.get_on_demand_feature_view, + feature_view_name, + allow_registry_cache=allow_registry_cache, + ) + except FeatureViewNotFoundException: + return await run_in_threadpool( + store.get_stream_feature_view, + feature_view_name, + allow_registry_cache=allow_registry_cache, + ) @app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)]) async def write_to_online_store(request: WriteToFeatureStoreRequest) -> None: