diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py index 5e8cf3d9053..28999746f7a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py @@ -1,4 +1,5 @@ import contextlib +import logging import re from dataclasses import asdict from datetime import datetime @@ -32,6 +33,8 @@ from feast.infra.utils.clickhouse.connection_utils import get_client from feast.saved_dataset import SavedDatasetStorage +logger = logging.getLogger(__name__) + class ClickhouseOfflineStoreConfig(ClickhouseConfig): type: Literal["clickhouse"] = "clickhouse" @@ -205,6 +208,34 @@ def pull_all_from_table_or_query( assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) assert isinstance(data_source, ClickhouseSource) + if start_date is None or end_date is None: + raise ValueError( + "Start_date and end_date must be provided. Pulling without filtering is not supported in ClickHouseOfflineStore." + ) + + # https://github.com/feast-dev/feast/issues/5707 + # Not an ideal solution, but least invasion into existing codebase + if config.offline_store.deduplicate_pushdown: + logger.info( + """ + deduplicate_pushdown optimization is set to True in ClickhouseOfflineStoreConfig. + Calling pull_latest_from_table_or_query instead of pull_all_from_table_or_query. + This results in multiple times more efficient materialization jobs for large datasets. + However, it comes with a caveat - can't compute historical features (just latest values available) with Feast compute engines + Use with caution and ensure your use case aligns with this behavior. + """ + ) + return ClickhouseOfflineStore.pull_latest_from_table_or_query( + config=config, + data_source=data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + from_expression = data_source.get_table_query_string() timestamp_fields = [timestamp_field] diff --git a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py index 1f163e0a81b..b7958d53d98 100644 --- a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py +++ b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py @@ -11,4 +11,9 @@ class ClickhouseConfig(FeastConfigBaseModel): password: StrictStr use_temporary_tables_for_entity_df: bool = True + # https://github.com/feast-dev/feast/issues/5707 + # We observed that for large materialization jobs, it's multiple times more efficient + # pushdown deduplication to ClickHouse side rather than doing it in Feast (no matter compute engine used) + deduplicate_pushdown: bool = False + model_config = ConfigDict(frozen=True)