Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import logging
import re
from dataclasses import asdict
from datetime import datetime
Expand Down Expand Up @@ -32,6 +33,8 @@
from feast.infra.utils.clickhouse.connection_utils import get_client
from feast.saved_dataset import SavedDatasetStorage

logger = logging.getLogger(__name__)


class ClickhouseOfflineStoreConfig(ClickhouseConfig):
type: Literal["clickhouse"] = "clickhouse"
Expand Down Expand Up @@ -205,6 +208,34 @@ def pull_all_from_table_or_query(
assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig)
assert isinstance(data_source, ClickhouseSource)

if start_date is None or end_date is None:
raise ValueError(
"Start_date and end_date must be provided. Pulling without filtering is not supported in ClickHouseOfflineStore."
)

# https://github.com/feast-dev/feast/issues/5707
# Not an ideal solution, but least invasion into existing codebase
if config.offline_store.deduplicate_pushdown:
logger.info(
"""
deduplicate_pushdown optimization is set to True in ClickhouseOfflineStoreConfig.
Calling pull_latest_from_table_or_query instead of pull_all_from_table_or_query.
This results in multiple times more efficient materialization jobs for large datasets.
However, it comes with a caveat - can't compute historical features (just latest values available) with Feast compute engines
Use with caution and ensure your use case aligns with this behavior.
"""
)
return ClickhouseOfflineStore.pull_latest_from_table_or_query(
config=config,
data_source=data_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

from_expression = data_source.get_table_query_string()

timestamp_fields = [timestamp_field]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ class ClickhouseConfig(FeastConfigBaseModel):
password: StrictStr
use_temporary_tables_for_entity_df: bool = True

# https://github.com/feast-dev/feast/issues/5707
# We observed that for large materialization jobs, it's multiple times more efficient
# pushdown deduplication to ClickHouse side rather than doing it in Feast (no matter compute engine used)
deduplicate_pushdown: bool = False

model_config = ConfigDict(frozen=True)
Loading