Skip to content

Commit b3f62c1

Browse files
committed
feat: Add non-entity retrieval support for ClickHouse offline store
Enable get_historical_features() to be called with entity_df=None by passing start_date/end_date kwargs instead. When entity_df is None, a synthetic single-row DataFrame is created using the provided date range (or defaults: end_date=now, start_date from max TTL or 30 days). This brings ClickHouse to parity with the PostgreSQL offline store's non-entity retrieval mode. Fixes #5835 Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com>
1 parent f5f5e50 commit b3f62c1

File tree

3 files changed

+242
-3
lines changed

3 files changed

+242
-3
lines changed

.secrets.baseline

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,7 @@
14511451
"filename": "sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py",
14521452
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
14531453
"is_verified": false,
1454-
"line_number": 20
1454+
"line_number": 21
14551455
}
14561456
],
14571457
"sdk/python/tests/unit/infra/offline_stores/test_offline_store.py": [

sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import contextlib
22
import re
33
from dataclasses import asdict
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
from typing import Iterator, List, Literal, Optional, Union, cast
66

77
import numpy as np
@@ -31,6 +31,7 @@
3131
from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig
3232
from feast.infra.utils.clickhouse.connection_utils import get_client
3333
from feast.saved_dataset import SavedDatasetStorage
34+
from feast.utils import _utc_now, make_tzaware
3435

3536

3637
class ClickhouseOfflineStoreConfig(ClickhouseConfig):
@@ -43,15 +44,44 @@ def get_historical_features(
4344
config: RepoConfig,
4445
feature_views: List[FeatureView],
4546
feature_refs: List[str],
46-
entity_df: Union[pd.DataFrame, str],
47+
entity_df: Optional[Union[pd.DataFrame, str]],
4748
registry: BaseRegistry,
4849
project: str,
4950
full_feature_names: bool = False,
51+
**kwargs,
5052
) -> RetrievalJob:
5153
assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig)
5254
for fv in feature_views:
5355
assert isinstance(fv.batch_source, ClickhouseSource)
5456

57+
start_date: Optional[datetime] = kwargs.get("start_date", None)
58+
end_date: Optional[datetime] = kwargs.get("end_date", None)
59+
60+
# Handle non-entity retrieval mode
61+
if entity_df is None:
62+
if end_date is None:
63+
end_date = _utc_now()
64+
else:
65+
end_date = make_tzaware(end_date)
66+
67+
if start_date is None:
68+
max_ttl_seconds = 0
69+
for fv in feature_views:
70+
if fv.ttl and isinstance(fv.ttl, timedelta):
71+
ttl_seconds = int(fv.ttl.total_seconds())
72+
max_ttl_seconds = max(max_ttl_seconds, ttl_seconds)
73+
74+
if max_ttl_seconds > 0:
75+
start_date = end_date - timedelta(seconds=max_ttl_seconds)
76+
else:
77+
start_date = end_date - timedelta(days=30)
78+
else:
79+
start_date = make_tzaware(start_date)
80+
81+
entity_df = pd.DataFrame(
82+
{"event_timestamp": [end_date]}
83+
)
84+
5585
entity_schema = _get_entity_schema(entity_df, config)
5686

5787
entity_df_event_timestamp_col = (

sdk/python/tests/unit/infra/offline_stores/test_clickhouse.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import threading
3+
from datetime import datetime, timedelta, timezone
34
from unittest.mock import MagicMock, patch
45

56
import pytest
@@ -133,3 +134,211 @@ def test_clickhouse_config_handles_none_additional_client_args():
133134
config = ClickhouseConfig(**raw_config)
134135

135136
assert config.additional_client_args is None
137+
138+
139+
def _mock_clickhouse_offline_store_config():
140+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
141+
ClickhouseOfflineStoreConfig,
142+
)
143+
144+
return ClickhouseOfflineStoreConfig(
145+
type="clickhouse",
146+
host="localhost",
147+
port=9000,
148+
database="test_db",
149+
user="default",
150+
password="password",
151+
)
152+
153+
154+
def _mock_clickhouse_feature_view(name: str, ttl: timedelta = None):
155+
from feast.entity import Entity
156+
from feast.feature_view import FeatureView, Field
157+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import (
158+
ClickhouseSource,
159+
)
160+
from feast.types import Float32
161+
162+
return FeatureView(
163+
name=name,
164+
entities=[Entity(name="driver_id", join_keys=["driver_id"])],
165+
ttl=ttl,
166+
source=ClickhouseSource(
167+
name=f"{name}_source",
168+
table=f"{name}_table",
169+
timestamp_field="event_timestamp",
170+
),
171+
schema=[
172+
Field(name="feature1", dtype=Float32),
173+
Field(name="feature2", dtype=Float32),
174+
],
175+
)
176+
177+
178+
class TestNonEntityRetrieval:
179+
"""Test suite for non-entity retrieval functionality (entity_df=None) for ClickHouse."""
180+
181+
_PATCH_PREFIX = (
182+
"feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse"
183+
)
184+
185+
def test_non_entity_mode_with_both_dates(self):
186+
"""Test non-entity retrieval API accepts both start_date and end_date"""
187+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
188+
ClickhouseOfflineStore,
189+
)
190+
from feast.infra.offline_stores.offline_store import RetrievalJob
191+
from feast.repo_config import RepoConfig
192+
193+
test_repo_config = RepoConfig(
194+
project="test_project",
195+
registry="test_registry",
196+
provider="local",
197+
offline_store=_mock_clickhouse_offline_store_config(),
198+
)
199+
200+
feature_view = _mock_clickhouse_feature_view("test_fv", ttl=None)
201+
start_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
202+
end_date = datetime(2023, 1, 7, tzinfo=timezone.utc)
203+
204+
with patch.multiple(
205+
self._PATCH_PREFIX,
206+
_upload_entity_df=MagicMock(),
207+
_get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}),
208+
_get_entity_df_event_timestamp_range=MagicMock(
209+
return_value=(start_date, end_date)
210+
),
211+
):
212+
with (
213+
patch(
214+
f"{self._PATCH_PREFIX}.offline_utils.get_expected_join_keys",
215+
return_value=[],
216+
),
217+
patch(
218+
f"{self._PATCH_PREFIX}.offline_utils.assert_expected_columns_in_entity_df"
219+
),
220+
patch(
221+
f"{self._PATCH_PREFIX}.offline_utils.get_feature_view_query_context",
222+
return_value=[],
223+
),
224+
):
225+
retrieval_job = ClickhouseOfflineStore.get_historical_features(
226+
config=test_repo_config,
227+
feature_views=[feature_view],
228+
feature_refs=["test_fv:feature1"],
229+
entity_df=None,
230+
registry=MagicMock(),
231+
project="test_project",
232+
start_date=start_date,
233+
end_date=end_date,
234+
)
235+
assert isinstance(retrieval_job, RetrievalJob)
236+
237+
def test_non_entity_mode_with_end_date_only(self):
238+
"""Test non-entity retrieval calculates start_date from TTL"""
239+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
240+
ClickhouseOfflineStore,
241+
)
242+
from feast.infra.offline_stores.offline_store import RetrievalJob
243+
from feast.repo_config import RepoConfig
244+
245+
test_repo_config = RepoConfig(
246+
project="test_project",
247+
registry="test_registry",
248+
provider="local",
249+
offline_store=_mock_clickhouse_offline_store_config(),
250+
)
251+
252+
feature_views = [
253+
_mock_clickhouse_feature_view("user_fv", ttl=timedelta(hours=1)),
254+
_mock_clickhouse_feature_view("transaction_fv", ttl=timedelta(days=1)),
255+
]
256+
end_date = datetime(2023, 1, 7, tzinfo=timezone.utc)
257+
258+
with patch.multiple(
259+
self._PATCH_PREFIX,
260+
_upload_entity_df=MagicMock(),
261+
_get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}),
262+
_get_entity_df_event_timestamp_range=MagicMock(
263+
return_value=(datetime(2023, 1, 6, tzinfo=timezone.utc), end_date)
264+
),
265+
):
266+
with (
267+
patch(
268+
f"{self._PATCH_PREFIX}.offline_utils.get_expected_join_keys",
269+
return_value=[],
270+
),
271+
patch(
272+
f"{self._PATCH_PREFIX}.offline_utils.assert_expected_columns_in_entity_df"
273+
),
274+
patch(
275+
f"{self._PATCH_PREFIX}.offline_utils.get_feature_view_query_context",
276+
return_value=[],
277+
),
278+
):
279+
retrieval_job = ClickhouseOfflineStore.get_historical_features(
280+
config=test_repo_config,
281+
feature_views=feature_views,
282+
feature_refs=["user_fv:feature1", "transaction_fv:feature1"],
283+
entity_df=None,
284+
registry=MagicMock(),
285+
project="test_project",
286+
end_date=end_date,
287+
)
288+
assert isinstance(retrieval_job, RetrievalJob)
289+
290+
@patch("feast.utils.datetime")
291+
def test_no_dates_provided_defaults_to_current_time(self, mock_datetime):
292+
"""Test that when no dates are provided, end_date defaults to current time"""
293+
from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import (
294+
ClickhouseOfflineStore,
295+
)
296+
from feast.infra.offline_stores.offline_store import RetrievalJob
297+
from feast.repo_config import RepoConfig
298+
299+
fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc)
300+
mock_datetime.now.return_value = fixed_now
301+
302+
test_repo_config = RepoConfig(
303+
project="test_project",
304+
registry="test_registry",
305+
provider="local",
306+
offline_store=_mock_clickhouse_offline_store_config(),
307+
)
308+
309+
feature_view = _mock_clickhouse_feature_view("test_fv", ttl=timedelta(days=1))
310+
311+
with patch.multiple(
312+
self._PATCH_PREFIX,
313+
_upload_entity_df=MagicMock(),
314+
_get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}),
315+
_get_entity_df_event_timestamp_range=MagicMock(
316+
return_value=(
317+
datetime(2023, 1, 6, 12, 0, 0, tzinfo=timezone.utc),
318+
fixed_now,
319+
)
320+
),
321+
):
322+
with (
323+
patch(
324+
f"{self._PATCH_PREFIX}.offline_utils.get_expected_join_keys",
325+
return_value=[],
326+
),
327+
patch(
328+
f"{self._PATCH_PREFIX}.offline_utils.assert_expected_columns_in_entity_df"
329+
),
330+
patch(
331+
f"{self._PATCH_PREFIX}.offline_utils.get_feature_view_query_context",
332+
return_value=[],
333+
),
334+
):
335+
retrieval_job = ClickhouseOfflineStore.get_historical_features(
336+
config=test_repo_config,
337+
feature_views=[feature_view],
338+
feature_refs=["test_fv:feature1"],
339+
entity_df=None,
340+
registry=MagicMock(),
341+
project="test_project",
342+
)
343+
mock_datetime.now.assert_called_with(tz=timezone.utc)
344+
assert isinstance(retrieval_job, RetrievalJob)

0 commit comments

Comments
 (0)