|
1 | 1 | import logging |
2 | 2 | import threading |
| 3 | +from datetime import datetime, timedelta, timezone |
3 | 4 | from unittest.mock import MagicMock, patch |
4 | 5 |
|
5 | 6 | import pytest |
@@ -133,3 +134,211 @@ def test_clickhouse_config_handles_none_additional_client_args(): |
133 | 134 | config = ClickhouseConfig(**raw_config) |
134 | 135 |
|
135 | 136 | assert config.additional_client_args is None |
| 137 | + |
| 138 | + |
| 139 | +def _mock_clickhouse_offline_store_config(): |
| 140 | + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( |
| 141 | + ClickhouseOfflineStoreConfig, |
| 142 | + ) |
| 143 | + |
| 144 | + return ClickhouseOfflineStoreConfig( |
| 145 | + type="clickhouse", |
| 146 | + host="localhost", |
| 147 | + port=9000, |
| 148 | + database="test_db", |
| 149 | + user="default", |
| 150 | + password="password", |
| 151 | + ) |
| 152 | + |
| 153 | + |
| 154 | +def _mock_clickhouse_feature_view(name: str, ttl: timedelta = None): |
| 155 | + from feast.entity import Entity |
| 156 | + from feast.feature_view import FeatureView, Field |
| 157 | + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import ( |
| 158 | + ClickhouseSource, |
| 159 | + ) |
| 160 | + from feast.types import Float32 |
| 161 | + |
| 162 | + return FeatureView( |
| 163 | + name=name, |
| 164 | + entities=[Entity(name="driver_id", join_keys=["driver_id"])], |
| 165 | + ttl=ttl, |
| 166 | + source=ClickhouseSource( |
| 167 | + name=f"{name}_source", |
| 168 | + table=f"{name}_table", |
| 169 | + timestamp_field="event_timestamp", |
| 170 | + ), |
| 171 | + schema=[ |
| 172 | + Field(name="feature1", dtype=Float32), |
| 173 | + Field(name="feature2", dtype=Float32), |
| 174 | + ], |
| 175 | + ) |
| 176 | + |
| 177 | + |
| 178 | +class TestNonEntityRetrieval: |
| 179 | + """Test suite for non-entity retrieval functionality (entity_df=None) for ClickHouse.""" |
| 180 | + |
| 181 | + _PATCH_PREFIX = ( |
| 182 | + "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse" |
| 183 | + ) |
| 184 | + |
| 185 | + def test_non_entity_mode_with_both_dates(self): |
| 186 | + """Test non-entity retrieval API accepts both start_date and end_date""" |
| 187 | + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( |
| 188 | + ClickhouseOfflineStore, |
| 189 | + ) |
| 190 | + from feast.infra.offline_stores.offline_store import RetrievalJob |
| 191 | + from feast.repo_config import RepoConfig |
| 192 | + |
| 193 | + test_repo_config = RepoConfig( |
| 194 | + project="test_project", |
| 195 | + registry="test_registry", |
| 196 | + provider="local", |
| 197 | + offline_store=_mock_clickhouse_offline_store_config(), |
| 198 | + ) |
| 199 | + |
| 200 | + feature_view = _mock_clickhouse_feature_view("test_fv", ttl=None) |
| 201 | + start_date = datetime(2023, 1, 1, tzinfo=timezone.utc) |
| 202 | + end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) |
| 203 | + |
| 204 | + with patch.multiple( |
| 205 | + self._PATCH_PREFIX, |
| 206 | + _upload_entity_df=MagicMock(), |
| 207 | + _get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}), |
| 208 | + _get_entity_df_event_timestamp_range=MagicMock( |
| 209 | + return_value=(start_date, end_date) |
| 210 | + ), |
| 211 | + ): |
| 212 | + with ( |
| 213 | + patch( |
| 214 | + f"{self._PATCH_PREFIX}.offline_utils.get_expected_join_keys", |
| 215 | + return_value=[], |
| 216 | + ), |
| 217 | + patch( |
| 218 | + f"{self._PATCH_PREFIX}.offline_utils.assert_expected_columns_in_entity_df" |
| 219 | + ), |
| 220 | + patch( |
| 221 | + f"{self._PATCH_PREFIX}.offline_utils.get_feature_view_query_context", |
| 222 | + return_value=[], |
| 223 | + ), |
| 224 | + ): |
| 225 | + retrieval_job = ClickhouseOfflineStore.get_historical_features( |
| 226 | + config=test_repo_config, |
| 227 | + feature_views=[feature_view], |
| 228 | + feature_refs=["test_fv:feature1"], |
| 229 | + entity_df=None, |
| 230 | + registry=MagicMock(), |
| 231 | + project="test_project", |
| 232 | + start_date=start_date, |
| 233 | + end_date=end_date, |
| 234 | + ) |
| 235 | + assert isinstance(retrieval_job, RetrievalJob) |
| 236 | + |
| 237 | + def test_non_entity_mode_with_end_date_only(self): |
| 238 | + """Test non-entity retrieval calculates start_date from TTL""" |
| 239 | + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( |
| 240 | + ClickhouseOfflineStore, |
| 241 | + ) |
| 242 | + from feast.infra.offline_stores.offline_store import RetrievalJob |
| 243 | + from feast.repo_config import RepoConfig |
| 244 | + |
| 245 | + test_repo_config = RepoConfig( |
| 246 | + project="test_project", |
| 247 | + registry="test_registry", |
| 248 | + provider="local", |
| 249 | + offline_store=_mock_clickhouse_offline_store_config(), |
| 250 | + ) |
| 251 | + |
| 252 | + feature_views = [ |
| 253 | + _mock_clickhouse_feature_view("user_fv", ttl=timedelta(hours=1)), |
| 254 | + _mock_clickhouse_feature_view("transaction_fv", ttl=timedelta(days=1)), |
| 255 | + ] |
| 256 | + end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) |
| 257 | + |
| 258 | + with patch.multiple( |
| 259 | + self._PATCH_PREFIX, |
| 260 | + _upload_entity_df=MagicMock(), |
| 261 | + _get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}), |
| 262 | + _get_entity_df_event_timestamp_range=MagicMock( |
| 263 | + return_value=(datetime(2023, 1, 6, tzinfo=timezone.utc), end_date) |
| 264 | + ), |
| 265 | + ): |
| 266 | + with ( |
| 267 | + patch( |
| 268 | + f"{self._PATCH_PREFIX}.offline_utils.get_expected_join_keys", |
| 269 | + return_value=[], |
| 270 | + ), |
| 271 | + patch( |
| 272 | + f"{self._PATCH_PREFIX}.offline_utils.assert_expected_columns_in_entity_df" |
| 273 | + ), |
| 274 | + patch( |
| 275 | + f"{self._PATCH_PREFIX}.offline_utils.get_feature_view_query_context", |
| 276 | + return_value=[], |
| 277 | + ), |
| 278 | + ): |
| 279 | + retrieval_job = ClickhouseOfflineStore.get_historical_features( |
| 280 | + config=test_repo_config, |
| 281 | + feature_views=feature_views, |
| 282 | + feature_refs=["user_fv:feature1", "transaction_fv:feature1"], |
| 283 | + entity_df=None, |
| 284 | + registry=MagicMock(), |
| 285 | + project="test_project", |
| 286 | + end_date=end_date, |
| 287 | + ) |
| 288 | + assert isinstance(retrieval_job, RetrievalJob) |
| 289 | + |
| 290 | + @patch("feast.utils.datetime") |
| 291 | + def test_no_dates_provided_defaults_to_current_time(self, mock_datetime): |
| 292 | + """Test that when no dates are provided, end_date defaults to current time""" |
| 293 | + from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( |
| 294 | + ClickhouseOfflineStore, |
| 295 | + ) |
| 296 | + from feast.infra.offline_stores.offline_store import RetrievalJob |
| 297 | + from feast.repo_config import RepoConfig |
| 298 | + |
| 299 | + fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc) |
| 300 | + mock_datetime.now.return_value = fixed_now |
| 301 | + |
| 302 | + test_repo_config = RepoConfig( |
| 303 | + project="test_project", |
| 304 | + registry="test_registry", |
| 305 | + provider="local", |
| 306 | + offline_store=_mock_clickhouse_offline_store_config(), |
| 307 | + ) |
| 308 | + |
| 309 | + feature_view = _mock_clickhouse_feature_view("test_fv", ttl=timedelta(days=1)) |
| 310 | + |
| 311 | + with patch.multiple( |
| 312 | + self._PATCH_PREFIX, |
| 313 | + _upload_entity_df=MagicMock(), |
| 314 | + _get_entity_schema=MagicMock(return_value={"event_timestamp": "timestamp"}), |
| 315 | + _get_entity_df_event_timestamp_range=MagicMock( |
| 316 | + return_value=( |
| 317 | + datetime(2023, 1, 6, 12, 0, 0, tzinfo=timezone.utc), |
| 318 | + fixed_now, |
| 319 | + ) |
| 320 | + ), |
| 321 | + ): |
| 322 | + with ( |
| 323 | + patch( |
| 324 | + f"{self._PATCH_PREFIX}.offline_utils.get_expected_join_keys", |
| 325 | + return_value=[], |
| 326 | + ), |
| 327 | + patch( |
| 328 | + f"{self._PATCH_PREFIX}.offline_utils.assert_expected_columns_in_entity_df" |
| 329 | + ), |
| 330 | + patch( |
| 331 | + f"{self._PATCH_PREFIX}.offline_utils.get_feature_view_query_context", |
| 332 | + return_value=[], |
| 333 | + ), |
| 334 | + ): |
| 335 | + retrieval_job = ClickhouseOfflineStore.get_historical_features( |
| 336 | + config=test_repo_config, |
| 337 | + feature_views=[feature_view], |
| 338 | + feature_refs=["test_fv:feature1"], |
| 339 | + entity_df=None, |
| 340 | + registry=MagicMock(), |
| 341 | + project="test_project", |
| 342 | + ) |
| 343 | + mock_datetime.now.assert_called_with(tz=timezone.utc) |
| 344 | + assert isinstance(retrieval_job, RetrievalJob) |
0 commit comments