diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5cc232d5fca..8632d619fec 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -656,7 +656,7 @@ def _make_inferences( def _get_feature_views_to_materialize( self, feature_views: Optional[List[str]], - ) -> List[FeatureView]: + ) -> List[Union[FeatureView, OnDemandFeatureView]]: """ Returns the list of feature views that should be materialized. @@ -669,34 +669,53 @@ def _get_feature_views_to_materialize( FeatureViewNotFoundException: One of the specified feature views could not be found. ValueError: One of the specified feature views is not configured for materialization. """ - feature_views_to_materialize: List[FeatureView] = [] + feature_views_to_materialize: List[Union[FeatureView, OnDemandFeatureView]] = [] if feature_views is None: - feature_views_to_materialize = utils._list_feature_views( + regular_feature_views = utils._list_feature_views( self._registry, self.project, hide_dummy_entity=False ) - feature_views_to_materialize = [ - fv for fv in feature_views_to_materialize if fv.online - ] + feature_views_to_materialize.extend( + [fv for fv in regular_feature_views if fv.online] + ) stream_feature_views_to_materialize = self._list_stream_feature_views( hide_dummy_entity=False ) - feature_views_to_materialize += [ - sfv for sfv in stream_feature_views_to_materialize if sfv.online - ] + feature_views_to_materialize.extend( + [sfv for sfv in stream_feature_views_to_materialize if sfv.online] + ) + on_demand_feature_views_to_materialize = self.list_on_demand_feature_views() + feature_views_to_materialize.extend( + [ + odfv + for odfv in on_demand_feature_views_to_materialize + if odfv.write_to_online_store + ] + ) else: for name in feature_views: + feature_view: Union[FeatureView, OnDemandFeatureView] try: feature_view = self._get_feature_view(name, hide_dummy_entity=False) except FeatureViewNotFoundException: - feature_view = self._get_stream_feature_view( - name, hide_dummy_entity=False - ) + try: + feature_view = self._get_stream_feature_view( + name, hide_dummy_entity=False + ) + except FeatureViewNotFoundException: + feature_view = self.get_on_demand_feature_view(name) - if not feature_view.online: + if hasattr(feature_view, "online") and not feature_view.online: raise ValueError( f"FeatureView {feature_view.name} is not configured to be served online." ) + elif ( + hasattr(feature_view, "write_to_online_store") + and not feature_view.write_to_online_store + ): + raise ValueError( + f"OnDemandFeatureView {feature_view.name} is not configured for write_to_online_store." + ) feature_views_to_materialize.append(feature_view) return feature_views_to_materialize @@ -1312,6 +1331,8 @@ def materialize_incremental( ) # TODO paging large loads for feature_view in feature_views_to_materialize: + if isinstance(feature_view, OnDemandFeatureView): + continue start_date = feature_view.most_recent_end_time if start_date is None: if feature_view.ttl is None: @@ -1340,7 +1361,7 @@ def tqdm_builder(length): return tqdm(total=length, ncols=100) start_date = utils.make_tzaware(start_date) - end_date = utils.make_tzaware(end_date) + end_date = utils.make_tzaware(end_date) or _utc_now() provider.materialize_single_feature_view( config=self.config, @@ -1351,13 +1372,13 @@ def tqdm_builder(length): project=self.project, tqdm_builder=tqdm_builder, ) - - self._registry.apply_materialization( - feature_view, - self.project, - start_date, - end_date, - ) + if not isinstance(feature_view, OnDemandFeatureView): + self._registry.apply_materialization( + feature_view, + self.project, + start_date, + end_date, + ) def materialize( self, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index b532ac563d4..d4b586f5c93 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -420,17 +420,24 @@ def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table) def materialize_single_feature_view( self, config: RepoConfig, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], start_date: datetime, end_date: datetime, registry: BaseRegistry, project: str, tqdm_builder: Callable[[int], tqdm], ) -> None: + if isinstance(feature_view, OnDemandFeatureView): + if not feature_view.write_to_online_store: + raise ValueError( + f"OnDemandFeatureView {feature_view.name} does not have write_to_online_store enabled" + ) + return assert ( isinstance(feature_view, BatchFeatureView) or isinstance(feature_view, StreamFeatureView) or isinstance(feature_view, FeatureView) + or isinstance(feature_view, OnDemandFeatureView) ), f"Unexpected type for {feature_view.name}: {type(feature_view)}" task = MaterializationTask( project=project, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 15917420af0..4f7b0d4b5c1 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -217,7 +217,7 @@ def ingest_df_to_offline_store( def materialize_single_feature_view( self, config: RepoConfig, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], start_date: datetime, end_date: datetime, registry: BaseRegistry, diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index f2374edf1b2..85810f1fbc1 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -16,7 +16,7 @@ from abc import ABC, abstractmethod from collections import defaultdict from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from google.protobuf.json_format import MessageToJson from google.protobuf.message import Message @@ -432,7 +432,7 @@ def list_all_feature_views( @abstractmethod def apply_materialization( self, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], project: str, start_date: datetime, end_date: datetime, diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 62a21d5c433..0cfbc77b24e 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -16,7 +16,7 @@ from enum import Enum from pathlib import Path from threading import Lock -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from urllib.parse import urlparse from google.protobuf.internal.containers import RepeatedCompositeFieldContainer @@ -529,7 +529,7 @@ def get_data_source( def apply_materialization( self, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], project: str, start_date: datetime, end_date: datetime, diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 590c0454b73..4122586046f 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -356,7 +356,7 @@ def list_feature_views( def apply_materialization( self, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], project: str, start_date: datetime, end_date: datetime, diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 06403fe9aee..e46231ca7a0 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -992,7 +992,7 @@ def list_permissions( def apply_materialization( self, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], project: str, start_date: datetime, end_date: datetime, diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index c42e6e8b82b..68dcd893f9d 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -702,7 +702,7 @@ def apply_validation_reference( def apply_materialization( self, - feature_view: FeatureView, + feature_view: Union[FeatureView, OnDemandFeatureView], project: str, start_date: datetime, end_date: datetime, diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index eb29c645e53..7c492d04d47 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1117,6 +1117,210 @@ def python_stored_writes_feature_view( "current_datetime": [None], } + def test_materialize_with_odfv_writes(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet( + path=driver_stats_path, allow_truncated_timestamps=True + ) + + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + tags={}, + ) + + input_request_source = RequestSource( + name="vals_to_add", + schema=[ + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + ], + ) + + @on_demand_feature_view( + entities=[driver], + sources=[ + driver_stats_fv[["conv_rate", "acc_rate"]], + input_request_source, + ], + schema=[ + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="current_datetime", dtype=UnixTimestamp), + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + Field(name="string_constant", dtype=String), + ], + mode="python", + write_to_online_store=True, + ) + def python_stored_writes_feature_view( + inputs: dict[str, Any], + ) -> dict[str, Any]: + output: dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ], + "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + "counter": [c + 1 for c in inputs["counter"]], + "input_datetime": [d for d in inputs["input_datetime"]], + "string_constant": ["test_constant"], + } + return output + + @on_demand_feature_view( + entities=[driver], + sources=[ + driver_stats_fv[["conv_rate", "acc_rate"]], + input_request_source, + ], + schema=[ + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="current_datetime", dtype=UnixTimestamp), + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + Field(name="string_constant", dtype=String), + ], + mode="python", + write_to_online_store=False, + ) + def python_no_writes_feature_view( + inputs: dict[str, Any], + ) -> dict[str, Any]: + output: dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ], + "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + "counter": [c + 1 for c in inputs["counter"]], + "input_datetime": [d for d in inputs["input_datetime"]], + "string_constant": ["test_constant"], + } + return output + + self.store.apply( + [ + driver, + driver_stats_source, + driver_stats_fv, + python_stored_writes_feature_view, + python_no_writes_feature_view, + ] + ) + + feature_views_to_materialize = self.store._get_feature_views_to_materialize( + None + ) + + odfv_names = [ + fv.name + for fv in feature_views_to_materialize + if hasattr(fv, "write_to_online_store") + ] + assert "python_stored_writes_feature_view" in odfv_names + assert "python_no_writes_feature_view" not in odfv_names + + regular_fv_names = [ + fv.name + for fv in feature_views_to_materialize + if not hasattr(fv, "write_to_online_store") + ] + assert "driver_hourly_stats" in regular_fv_names + + materialize_end_date = datetime.now().replace( + microsecond=0, second=0, minute=0 + ) + materialize_start_date = materialize_end_date - timedelta(days=1) + + self.store.materialize(materialize_start_date, materialize_end_date) + + specific_feature_views_to_materialize = ( + self.store._get_feature_views_to_materialize( + ["driver_hourly_stats", "python_stored_writes_feature_view"] + ) + ) + assert len(specific_feature_views_to_materialize) == 2 + + # materialize some data into the online store for the python_stored_writes_feature_view + self.store.materialize( + materialize_start_date, + materialize_end_date, + ["python_stored_writes_feature_view"], + ) + # validate data is loaded to online store + online_response = self.store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=[ + "python_stored_writes_feature_view:conv_rate_plus_acc", + "python_stored_writes_feature_view:current_datetime", + "python_stored_writes_feature_view:counter", + "python_stored_writes_feature_view:input_datetime", + "python_stored_writes_feature_view:string_constant", + ], + ).to_dict() + assert sorted(list(online_response.keys())) == sorted( + [ + "driver_id", + "conv_rate_plus_acc", + "counter", + "current_datetime", + "input_datetime", + "string_constant", + ] + ) + assert online_response["driver_id"] == [1001] + + try: + self.store._get_feature_views_to_materialize( + ["python_no_writes_feature_view"] + ) + assert False, ( + "Should have raised ValueError for ODFV without write_to_online_store" + ) + except ValueError as e: + assert "not configured for write_to_online_store" in str(e) + def test_stored_writes_with_explode(self): with tempfile.TemporaryDirectory() as data_dir: self.store = FeatureStore(