From ac2efa48a410ae9e7d642d095b50a92f92e44ad4 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Sat, 2 Mar 2024 14:19:07 +0700 Subject: [PATCH 1/3] remove unused parameter when init sparksource Signed-off-by: tanlocnguyen --- .../offline_stores/contrib/spark_offline_store/spark_source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index a27065fb5ed..1ff7e6de589 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -39,7 +39,6 @@ def __init__( query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None, - event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, description: Optional[str] = "", From 1ef7de92c91235c49d1fc4935131ba9be8740cef Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 6 Mar 2024 17:03:33 +0700 Subject: [PATCH 2/3] feat: add entity df to SparkOfflineStore when get_historical_features Signed-off-by: tanlocnguyen --- .../contrib/spark_offline_store/spark.py | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index c9591b7c3f0..63b03fbdf9d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -125,7 +125,7 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], + entity_df: Union[pandas.DataFrame, str, pyspark.sql.DataFrame], registry: Registry, project: str, full_feature_names: bool = False, @@ -473,15 +473,16 @@ def _get_entity_df_event_timestamp_range( entity_df_event_timestamp.min().to_pydatetime(), entity_df_event_timestamp.max().to_pydatetime(), ) - elif isinstance(entity_df, str): + elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame): # If the entity_df is a string (SQL query), determine range # from table - df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) - - # Checks if executing entity sql resulted in any data - if df.rdd.isEmpty(): - raise EntitySQLEmptyResults(entity_df) - + if isinstance(entity_df, str): + df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) + # Checks if executing entity sql resulted in any data + if df.rdd.isEmpty(): + raise EntitySQLEmptyResults(entity_df) + else: + df = entity_df # TODO(kzhang132): need utc conversion here. entity_df_event_timestamp_range = ( @@ -499,8 +500,11 @@ def _get_entity_schema( ) -> Dict[str, np.dtype]: if isinstance(entity_df, pd.DataFrame): return dict(zip(entity_df.columns, entity_df.dtypes)) - elif isinstance(entity_df, str): - entity_spark_df = spark_session.sql(entity_df) + elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame): + if isinstance(entity_df, str): + entity_spark_df = spark_session.sql(entity_df) + else: + entity_spark_df = entity_df return dict( zip( entity_spark_df.columns, @@ -525,6 +529,8 @@ def _upload_entity_df( return elif isinstance(entity_df, str): spark_session.sql(entity_df).createOrReplaceTempView(table_name) + elif isinstance(entity_df, pyspark.sql.DataFrame): + entity_df.createOrReplaceTempView(table_name) return else: raise InvalidEntityType(type(entity_df)) From a5313b5738912bc7018a93d56c8fddb7a4aba509 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 6 Mar 2024 20:33:12 +0700 Subject: [PATCH 3/3] fix: lint error Signed-off-by: tanlocnguyen --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 63b03fbdf9d..b1b1c04c7d7 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -529,6 +529,7 @@ def _upload_entity_df( return elif isinstance(entity_df, str): spark_session.sql(entity_df).createOrReplaceTempView(table_name) + return elif isinstance(entity_df, pyspark.sql.DataFrame): entity_df.createOrReplaceTempView(table_name) return