From 47e0e374a82912a385c42fa2e315ee42ae2a6cff Mon Sep 17 00:00:00 2001 From: tokoko Date: Tue, 2 Apr 2024 04:11:34 +0000 Subject: [PATCH 1/4] fix substrait odfvs for online, add tests Signed-off-by: tokoko --- sdk/python/feast/feature_store.py | 9 ++--- .../infra/offline_stores/offline_store.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 2 +- ...on.py => test_substrait_transformation.py} | 38 ++++++++++++++----- 4 files changed, 34 insertions(+), 17 deletions(-) rename sdk/python/tests/unit/{test_on_demand_substrait_transformation.py => test_substrait_transformation.py} (76%) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bfb8a59b2bb..83aaafd6863 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2037,11 +2037,10 @@ def _augment_response_with_on_demand_transforms( proto_values = [] for selected_feature in selected_subset: - if odfv.mode in ["python", "pandas"]: - feature_vector = transformed_features[selected_feature] - proto_values.append( - python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) - ) + feature_vector = transformed_features[selected_feature] + proto_values.append( + python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) + ) odfv_result_names |= set(selected_subset) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index aaed78dd459..6c16ef26439 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -128,7 +128,7 @@ def to_arrow( features_df = self._to_df_internal(timeout=timeout) if self.on_demand_feature_views: for odfv in self.on_demand_feature_views: - if odfv.mode != "pandas": + if odfv.mode not in {"pandas", "substrait"}: raise Exception( f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 8d51edbe587..f9ac1cefc58 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -465,7 +465,7 @@ def get_transformed_features( return self.get_transformed_features_dict( feature_dict=features, ) - elif self.mode == "pandas" and isinstance(features, pd.DataFrame): + elif self.mode in {"pandas", "substrait"} and isinstance(features, pd.DataFrame): return self.get_transformed_features_df( df_with_features=features, full_feature_names=full_feature_names, diff --git a/sdk/python/tests/unit/test_on_demand_substrait_transformation.py b/sdk/python/tests/unit/test_substrait_transformation.py similarity index 76% rename from sdk/python/tests/unit/test_on_demand_substrait_transformation.py rename to sdk/python/tests/unit/test_substrait_transformation.py index 378aa7ce3bd..a639cfb831c 100644 --- a/sdk/python/tests/unit/test_on_demand_substrait_transformation.py +++ b/sdk/python/tests/unit/test_substrait_transformation.py @@ -84,6 +84,8 @@ def substrait_view(inputs: Table) -> Table: [driver, driver_stats_source, driver_stats_fv, substrait_view, pandas_view] ) + store.materialize(start_date=datetime(2000, 4, 12, 10, 59, 42), end_date=datetime(3000, 4, 12, 10, 59, 42)) + entity_df = pd.DataFrame.from_dict( { # entity's join key -> entity values @@ -97,17 +99,33 @@ def substrait_view(inputs: Table) -> Table: } ) + substrait_view.infer_features() + pandas_view.infer_features() + + requested_features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "substrait_view:conv_rate_plus_acc_substrait", + "pandas_view:conv_rate_plus_acc", + ] + training_df = store.get_historical_features( entity_df=entity_df, - features=[ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "driver_hourly_stats:avg_daily_trips", - "substrait_view:conv_rate_plus_acc_substrait", - "pandas_view:conv_rate_plus_acc", - ], - ).to_df() + features=requested_features + ) - assert training_df["conv_rate_plus_acc"].equals( - training_df["conv_rate_plus_acc_substrait"] + assert training_df.to_df()["conv_rate_plus_acc"].equals( + training_df.to_df()["conv_rate_plus_acc_substrait"] ) + + assert training_df.to_arrow()["conv_rate_plus_acc"].equals( + training_df.to_arrow()["conv_rate_plus_acc_substrait"] + ) + + online_response = store.get_online_features( + features=requested_features, + entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}, {"driver_id": 1003}] + ) + + assert online_response.to_dict()['conv_rate_plus_acc'] == online_response.to_dict()['conv_rate_plus_acc_substrait'] From 9de3fde730cfe934b463effe5d9efcc79f9cf316 Mon Sep 17 00:00:00 2001 From: tokoko Date: Tue, 2 Apr 2024 04:19:36 +0000 Subject: [PATCH 2/4] fix formatting Signed-off-by: tokoko --- sdk/python/feast/on_demand_feature_view.py | 4 +++- .../unit/test_substrait_transformation.py | 18 ++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f9ac1cefc58..f83500cbc9b 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -465,7 +465,9 @@ def get_transformed_features( return self.get_transformed_features_dict( feature_dict=features, ) - elif self.mode in {"pandas", "substrait"} and isinstance(features, pd.DataFrame): + elif self.mode in {"pandas", "substrait"} and isinstance( + features, pd.DataFrame + ): return self.get_transformed_features_df( df_with_features=features, full_feature_names=full_feature_names, diff --git a/sdk/python/tests/unit/test_substrait_transformation.py b/sdk/python/tests/unit/test_substrait_transformation.py index a639cfb831c..996395e1799 100644 --- a/sdk/python/tests/unit/test_substrait_transformation.py +++ b/sdk/python/tests/unit/test_substrait_transformation.py @@ -84,7 +84,10 @@ def substrait_view(inputs: Table) -> Table: [driver, driver_stats_source, driver_stats_fv, substrait_view, pandas_view] ) - store.materialize(start_date=datetime(2000, 4, 12, 10, 59, 42), end_date=datetime(3000, 4, 12, 10, 59, 42)) + store.materialize( + start_date=datetime(2000, 4, 12, 10, 59, 42), + end_date=datetime(3000, 4, 12, 10, 59, 42), + ) entity_df = pd.DataFrame.from_dict( { @@ -99,9 +102,6 @@ def substrait_view(inputs: Table) -> Table: } ) - substrait_view.infer_features() - pandas_view.infer_features() - requested_features = [ "driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", @@ -111,8 +111,7 @@ def substrait_view(inputs: Table) -> Table: ] training_df = store.get_historical_features( - entity_df=entity_df, - features=requested_features + entity_df=entity_df, features=requested_features ) assert training_df.to_df()["conv_rate_plus_acc"].equals( @@ -125,7 +124,10 @@ def substrait_view(inputs: Table) -> Table: online_response = store.get_online_features( features=requested_features, - entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}, {"driver_id": 1003}] + entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}, {"driver_id": 1003}], ) - assert online_response.to_dict()['conv_rate_plus_acc'] == online_response.to_dict()['conv_rate_plus_acc_substrait'] + assert ( + online_response.to_dict()["conv_rate_plus_acc"] + == online_response.to_dict()["conv_rate_plus_acc_substrait"] + ) From 7b33fa256cf9c11d4c6323ae0d9ad1af2e542add Mon Sep 17 00:00:00 2001 From: tokoko Date: Tue, 2 Apr 2024 13:44:10 +0000 Subject: [PATCH 3/4] change odfv substrait test dates relative to start_date and end_date Signed-off-by: tokoko --- sdk/python/tests/unit/test_substrait_transformation.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/tests/unit/test_substrait_transformation.py b/sdk/python/tests/unit/test_substrait_transformation.py index 996395e1799..5721f5241c6 100644 --- a/sdk/python/tests/unit/test_substrait_transformation.py +++ b/sdk/python/tests/unit/test_substrait_transformation.py @@ -85,8 +85,8 @@ def substrait_view(inputs: Table) -> Table: ) store.materialize( - start_date=datetime(2000, 4, 12, 10, 59, 42), - end_date=datetime(3000, 4, 12, 10, 59, 42), + start_date=start_date, + end_date=end_date, ) entity_df = pd.DataFrame.from_dict( @@ -95,9 +95,9 @@ def substrait_view(inputs: Table) -> Table: "driver_id": [1001, 1002, 1003], # "event_timestamp" (reserved key) -> timestamps "event_timestamp": [ - datetime(2021, 4, 12, 10, 59, 42), - datetime(2021, 4, 12, 8, 12, 10), - datetime(2021, 4, 12, 16, 40, 26), + start_date + timedelta(days=4), + start_date + timedelta(days=5), + start_date + timedelta(days=6), ], } ) From 1bf360a0975f2d97a0ced9ddef2dec35e338e6ac Mon Sep 17 00:00:00 2001 From: tokoko Date: Tue, 2 Apr 2024 14:00:18 +0000 Subject: [PATCH 4/4] force tests rerun Signed-off-by: tokoko --- sdk/python/tests/unit/test_substrait_transformation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/unit/test_substrait_transformation.py b/sdk/python/tests/unit/test_substrait_transformation.py index 5721f5241c6..28ab68c70be 100644 --- a/sdk/python/tests/unit/test_substrait_transformation.py +++ b/sdk/python/tests/unit/test_substrait_transformation.py @@ -60,6 +60,7 @@ def test_ibis_pandas_parity(): @on_demand_feature_view( sources=[driver_stats_fv], schema=[Field(name="conv_rate_plus_acc", dtype=Float64)], + mode="pandas", ) def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: df = pd.DataFrame()