From a9f4643421e783dd5870e573c304c1173917169c Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Tue, 14 May 2024 22:57:32 +0200 Subject: [PATCH 1/4] fix: integration tests for async sdk method Signed-off-by: Breno Costa --- .../online_store/test_universal_online.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 9beba4d72b5..a034fcba9bf 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -1,3 +1,4 @@ +import asyncio import datetime import os import time @@ -464,6 +465,70 @@ def test_online_retrieval_with_event_timestamps( ) +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["redis"]) +def test_async_online_retrieval_with_event_timestamps( + environment, universal_data_sources +): + fs = environment.feature_store + entities, datasets, data_sources = universal_data_sources + feature_views = construct_universal_feature_views(data_sources) + + fs.apply([driver(), feature_views.driver, feature_views.global_fv]) + + data = { + "driver_id": [1, 2], + "conv_rate": [0.5, 0.3], + "acc_rate": [0.6, 0.4], + "avg_daily_trips": [4, 5], + "event_timestamp": [ + pd.to_datetime(1646263500, utc=True, unit="s"), + pd.to_datetime(1646263600, utc=True, unit="s"), + ], + "created": [ + pd.to_datetime(1646263500, unit="s"), + pd.to_datetime(1646263600, unit="s"), + ], + } + df_ingest = pd.DataFrame(data) + + fs.write_to_online_store("driver_stats", df_ingest) + + response = asyncio.run( + fs.get_online_features_async( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver_id": 1}, {"driver_id": 2}], + ) + ) + df = response.to_df(True) + + assertpy.assert_that(len(df)).is_equal_to(2) + assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) + assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + + @pytest.mark.integration @pytest.mark.universal_online_stores(only=["redis"]) def test_online_store_cleanup(environment, universal_data_sources): From 3fb535d3987684b4586cdcd600bcb526bc9b45ca Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Wed, 15 May 2024 14:54:10 +0200 Subject: [PATCH 2/4] Reuse same setup and assertion Signed-off-by: Breno Costa --- .../online_store/test_universal_online.py | 91 ++++++------------- 1 file changed, 29 insertions(+), 62 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index a034fcba9bf..7944a1be106 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -13,6 +13,7 @@ import requests from botocore.exceptions import BotoCoreError +from feast import FeatureStore from feast.entity import Entity from feast.errors import FeatureNameCollisionError from feast.feature_service import FeatureService @@ -401,19 +402,13 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s ) -@pytest.mark.integration -@pytest.mark.universal_online_stores -@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) -def test_online_retrieval_with_event_timestamps( - environment, universal_data_sources, full_feature_names -): - fs = environment.feature_store +def setup_feature_store_universal_feature_views(environment, universal_data_sources) -> FeatureStore: + fs: FeatureStore = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources) fs.apply([driver(), feature_views.driver, feature_views.global_fv]) - # fake data to ingest into Online Store data = { "driver_id": [1, 2], "conv_rate": [0.5, 0.3], @@ -430,18 +425,11 @@ def test_online_retrieval_with_event_timestamps( } df_ingest = pd.DataFrame(data) - # directly ingest data into the Online Store fs.write_to_online_store("driver_stats", df_ingest) + return fs - response = fs.get_online_features( - features=[ - "driver_stats:avg_daily_trips", - "driver_stats:acc_rate", - "driver_stats:conv_rate", - ], - entity_rows=[{"driver_id": 1}, {"driver_id": 2}], - ) - df = response.to_df(True) + +def assert_feature_store_universal_feature_views_response(df: pd.DataFrame): assertpy.assert_that(len(df)).is_equal_to(2) assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) @@ -466,33 +454,32 @@ def test_online_retrieval_with_event_timestamps( @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["redis"]) -def test_async_online_retrieval_with_event_timestamps( - environment, universal_data_sources +@pytest.mark.universal_online_stores +@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) +def test_online_retrieval_with_event_timestamps( + environment, universal_data_sources, full_feature_names ): - fs = environment.feature_store - entities, datasets, data_sources = universal_data_sources - feature_views = construct_universal_feature_views(data_sources) - - fs.apply([driver(), feature_views.driver, feature_views.global_fv]) + fs = setup_feature_store_universal_feature_views(fs, universal_data_sources) - data = { - "driver_id": [1, 2], - "conv_rate": [0.5, 0.3], - "acc_rate": [0.6, 0.4], - "avg_daily_trips": [4, 5], - "event_timestamp": [ - pd.to_datetime(1646263500, utc=True, unit="s"), - pd.to_datetime(1646263600, utc=True, unit="s"), - ], - "created": [ - pd.to_datetime(1646263500, unit="s"), - pd.to_datetime(1646263600, unit="s"), + response = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", ], - } - df_ingest = pd.DataFrame(data) + entity_rows=[{"driver_id": 1}, {"driver_id": 2}], + ) + df = response.to_df(True) - fs.write_to_online_store("driver_stats", df_ingest) + assert_feature_store_universal_feature_views_response(df) + + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["redis"]) +def test_async_online_retrieval_with_event_timestamps( + environment, universal_data_sources +): + fs = setup_feature_store_universal_feature_views(fs, universal_data_sources) response = asyncio.run( fs.get_online_features_async( @@ -506,27 +493,7 @@ def test_async_online_retrieval_with_event_timestamps( ) df = response.to_df(True) - assertpy.assert_that(len(df)).is_equal_to(2) - assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) - assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) - assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( - 1646263500 - ) - assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( - 1646263600 - ) - assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( - 1646263500 - ) - assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( - 1646263600 - ) - assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( - 1646263500 - ) - assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( - 1646263600 - ) + assert_feature_store_universal_feature_views_response(df) @pytest.mark.integration From 5d011953c295dcf79e51d2af40809ca6ed3df9ef Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Wed, 15 May 2024 14:57:35 +0200 Subject: [PATCH 3/4] fixup! Reuse same setup and assertion Signed-off-by: Breno Costa --- .../integration/online_store/test_universal_online.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 7944a1be106..fc4760aeb07 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -402,7 +402,9 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s ) -def setup_feature_store_universal_feature_views(environment, universal_data_sources) -> FeatureStore: +def setup_feature_store_universal_feature_views( + environment, universal_data_sources +) -> FeatureStore: fs: FeatureStore = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources) @@ -459,7 +461,7 @@ def assert_feature_store_universal_feature_views_response(df: pd.DataFrame): def test_online_retrieval_with_event_timestamps( environment, universal_data_sources, full_feature_names ): - fs = setup_feature_store_universal_feature_views(fs, universal_data_sources) + fs = setup_feature_store_universal_feature_views(universal_data_sources) response = fs.get_online_features( features=[ @@ -479,7 +481,7 @@ def test_online_retrieval_with_event_timestamps( def test_async_online_retrieval_with_event_timestamps( environment, universal_data_sources ): - fs = setup_feature_store_universal_feature_views(fs, universal_data_sources) + fs = setup_feature_store_universal_feature_views(universal_data_sources) response = asyncio.run( fs.get_online_features_async( From 8c0d2af68741a3777109648bf100eace97ea86f1 Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Thu, 16 May 2024 23:43:28 +0200 Subject: [PATCH 4/4] fixup! fixup! Reuse same setup and assertion Signed-off-by: Breno Costa --- .../online_store/test_universal_online.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index fc4760aeb07..4822a8d4f71 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -457,11 +457,10 @@ def assert_feature_store_universal_feature_views_response(df: pd.DataFrame): @pytest.mark.integration @pytest.mark.universal_online_stores -@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) -def test_online_retrieval_with_event_timestamps( - environment, universal_data_sources, full_feature_names -): - fs = setup_feature_store_universal_feature_views(universal_data_sources) +def test_online_retrieval_with_event_timestamps(environment, universal_data_sources): + fs = setup_feature_store_universal_feature_views( + environment, universal_data_sources + ) response = fs.get_online_features( features=[ @@ -481,7 +480,9 @@ def test_online_retrieval_with_event_timestamps( def test_async_online_retrieval_with_event_timestamps( environment, universal_data_sources ): - fs = setup_feature_store_universal_feature_views(universal_data_sources) + fs = setup_feature_store_universal_feature_views( + environment, universal_data_sources + ) response = asyncio.run( fs.get_online_features_async(