From 112014e3b74caf48f841744839f3485e882c3f4c Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 22:41:13 -0700 Subject: [PATCH 01/20] Update offline store pull all API date field optional Signed-off-by: HaoXuAI --- .../feast/infra/offline_stores/bigquery.py | 8 +- .../contrib/athena_offline_store/athena.py | 14 +++- .../couchbase_offline_store/couchbase.py | 12 ++- .../contrib/mssql_offline_store/mssql.py | 4 +- .../postgres_offline_store/postgres.py | 10 ++- .../contrib/spark_offline_store/spark.py | 10 +-- .../contrib/trino_offline_store/trino.py | 9 ++- sdk/python/feast/infra/offline_stores/dask.py | 21 +++-- .../feast/infra/offline_stores/duckdb.py | 4 +- sdk/python/feast/infra/offline_stores/ibis.py | 12 ++- .../infra/offline_stores/offline_store.py | 8 +- .../infra/offline_stores/offline_utils.py | 81 ++++++++++++++----- .../feast/infra/offline_stores/redshift.py | 6 +- .../feast/infra/offline_stores/remote.py | 8 +- .../feast/infra/offline_stores/snowflake.py | 10 +-- 15 files changed, 147 insertions(+), 70 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index f0516b594ee..bc61882ef62 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -52,6 +52,7 @@ BigQuerySource, SavedDatasetBigQueryStorage, ) +from .offline_utils import get_timestamp_filter_sql try: from google.api_core import client_info as http_client_info @@ -188,8 +189,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) assert isinstance(data_source, BigQuerySource) @@ -206,10 +207,11 @@ def pull_all_from_table_or_query( + BigQueryOfflineStore._escape_query_columns(feature_name_columns) + [timestamp_field] ) + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') + {timestamp_filter} """ return BigQueryRetrievalJob( query=query, diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index f49bfddb81d..04a1ce4cfd4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -40,6 +40,7 @@ from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage +from infra.offline_stores.offline_utils import get_timestamp_filter_sql class AthenaOfflineStoreConfig(FeastConfigBaseModel): @@ -131,8 +132,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, AthenaOfflineStoreConfig) assert isinstance(data_source, AthenaSource) @@ -147,10 +148,17 @@ def pull_all_from_table_or_query( date_partition_column = data_source.date_partition_column + if start_date: + start_date = start_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + if end_date: + end_date = end_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field) + query = f""" SELECT {field_string} FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' AND TIMESTAMP '{end_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' + WHERE {timestamp_filter} {"AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""} """ diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index a90d6c2172b..184e38cfe9b 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -46,6 +46,7 @@ CouchbaseColumnarSource, SavedDatasetCouchbaseColumnarStorage, ) +from ...offline_utils import get_timestamp_filter_sql # Only prints out runtime warnings once. warnings.simplefilter("once", RuntimeWarning) @@ -247,13 +248,18 @@ def pull_all_from_table_or_query( join_key_columns + feature_name_columns + [timestamp_field] ) - start_date_normalized = normalize_timestamp(start_date) - end_date_normalized = normalize_timestamp(end_date) + start_date_normalized = normalize_timestamp(start_date) if start_date else None + end_date_normalized = normalize_timestamp(end_date) if end_date else None + timestamp_filter = get_timestamp_filter_sql( + start_date_normalized, + end_date_normalized, + timestamp_field + ) query = f""" SELECT {field_string} FROM {from_expression} - WHERE `{timestamp_field}` BETWEEN '{start_date_normalized}' AND '{end_date_normalized}' + WHERE {timestamp_filter} """ return CouchbaseColumnarRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index 875d584568b..ff07bfb478c 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -177,8 +177,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: return pull_all_from_table_or_query_ibis( config=config, diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index dc5a7b30976..3ecb4ada1cb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -47,6 +47,7 @@ from feast.type_map import pg_type_code_to_arrow from .postgres_source import PostgreSQLSource +from ...offline_utils import get_timestamp_filter_sql class EntitySelectMode(Enum): @@ -237,13 +238,16 @@ def pull_all_from_table_or_query( join_key_columns + feature_name_columns + [timestamp_field] ) - start_date = start_date.astimezone(tz=timezone.utc) - end_date = end_date.astimezone(tz=timezone.utc) + if start_date: + start_date = f"'{start_date}'::timestamptz" + if end_date: + end_date = f"'{end_date}'::timestamptz" + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) query = f""" SELECT {field_string} FROM {from_expression} AS paftoq_alias - WHERE "{timestamp_field}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz + WHERE {timestamp_filter} """ return PostgreSQLRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 41c180f5c3c..c9f84ba6b21 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -34,6 +34,7 @@ from feast.saved_dataset import SavedDatasetStorage from feast.type_map import spark_schema_to_np_dtypes from feast.utils import _get_fields_with_aliases +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql # Make sure spark warning doesn't raise more than once. warnings.simplefilter("once", RuntimeWarning) @@ -269,8 +270,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: """ Note that join_key_columns, feature_name_columns, timestamp_field, and @@ -296,13 +297,12 @@ def pull_all_from_table_or_query( fields_with_alias_string = ", ".join(fields_with_aliases) from_expression = data_source.get_table_query_string() - start_date = start_date.astimezone(tz=timezone.utc) - end_date = end_date.astimezone(tz=timezone.utc) + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) query = f""" SELECT {fields_with_alias_string} FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + {timestamp_filter} """ return SparkRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 9667f4e4720..75832a63193 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -35,6 +35,7 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql class BasicAuthModel(FeastConfigBaseModel): @@ -405,8 +406,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, TrinoOfflineStoreConfig) assert isinstance(data_source, TrinoSource) @@ -416,10 +417,12 @@ def pull_all_from_table_or_query( field_string = ", ".join( join_key_columns + feature_name_columns + [timestamp_field] ) + + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_filter} """ return TrinoRetrievalJob( query=query, diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 01efc492f7c..58d3057ff78 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -314,8 +314,8 @@ def pull_latest_from_table_or_query( feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, DaskOfflineStoreConfig) assert isinstance(data_source, FileSource) @@ -359,10 +359,15 @@ def evaluate_offline_job(): source_df = source_df.sort_values(by=timestamp_field, npartitions=1) - source_df = source_df[ - (source_df[timestamp_field] >= start_date) - & (source_df[timestamp_field] < end_date) - ] + # TODO: The old implementation is inclusive of start_date and exclusive of end_date. + # Which is inconsistent with other offline stores. + if start_date or end_date: + if start_date and end_date: + source_df = source_df[source_df[timestamp_field].between(start_date, end_date)] + elif start_date: + source_df = source_df[source_df[timestamp_field] >= start_date] + elif end_date: + source_df = source_df[source_df[timestamp_field] < end_date] source_df = source_df.persist() @@ -393,8 +398,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, DaskOfflineStoreConfig) assert isinstance(data_source, FileSource) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index b2e3c03cb55..29c27d32a0c 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -179,8 +179,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: return pull_all_from_table_or_query_ibis( config=config, diff --git a/sdk/python/feast/infra/offline_stores/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py index 66d00ca6292..7f3df9776ee 100644 --- a/sdk/python/feast/infra/offline_stores/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -260,10 +260,10 @@ def pull_all_from_table_or_query_ibis( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, data_source_reader: Callable[[DataSource, str], Table], data_source_writer: Callable[[pyarrow.Table, DataSource, str], None], + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, staging_location: Optional[str] = None, staging_location_endpoint_override: Optional[str] = None, ) -> RetrievalJob: @@ -281,8 +281,12 @@ def pull_all_from_table_or_query_ibis( table = table.filter( ibis.and_( - table[timestamp_field] >= ibis.literal(start_date), - table[timestamp_field] <= ibis.literal(end_date), + table[timestamp_field] >= ibis.literal(start_date) + if start_date + else ibis.literal(True), + table[timestamp_field] <= ibis.literal(end_date) + if end_date + else ibis.literal(True), ) ) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 69d6bb278b7..09ab978c446 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -294,8 +294,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime], + end_date: Optional[datetime], ) -> RetrievalJob: """ Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and @@ -310,8 +310,8 @@ def pull_all_from_table_or_query( join_key_columns: The columns of the join keys. feature_name_columns: The columns of the features. timestamp_field: The timestamp column. - start_date: The start of the time range. - end_date: The end of the time range. + start_date (Optional): The start of the time range. + end_date (Optional): The end of the time range. Returns: A RetrievalJob that can be executed to get the entity rows. diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 5b12636782f..f9ec7dbb7df 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -1,7 +1,8 @@ import uuid from dataclasses import asdict, dataclass from datetime import datetime, timedelta -from typing import Any, Dict, KeysView, List, Optional, Set, Tuple +from time import timezone +from typing import Any, Dict, KeysView, List, Optional, Set, Tuple, Union import numpy as np import pandas as pd @@ -45,9 +46,9 @@ def infer_event_timestamp_from_entity_df(entity_schema: Dict[str, np.dtype]) -> def assert_expected_columns_in_entity_df( - entity_schema: Dict[str, np.dtype], - join_keys: Set[str], - entity_df_event_timestamp_col: str, + entity_schema: Dict[str, np.dtype], + join_keys: Set[str], + entity_df_event_timestamp_col: str, ): entity_columns = set(entity_schema.keys()) expected_columns = join_keys | {entity_df_event_timestamp_col} @@ -59,7 +60,9 @@ def assert_expected_columns_in_entity_df( # TODO: Remove project and registry from the interface and call sites. def get_expected_join_keys( - project: str, feature_views: List[FeatureView], registry: BaseRegistry + project: str, + feature_views: List[FeatureView], + registry: BaseRegistry ) -> Set[str]: join_keys = set() for feature_view in feature_views: @@ -72,7 +75,8 @@ def get_expected_join_keys( def get_entity_df_timestamp_bounds( - entity_df: pd.DataFrame, event_timestamp_col: str + entity_df: pd.DataFrame, + event_timestamp_col: str ) -> Tuple[Timestamp, Timestamp]: event_timestamp_series = entity_df[event_timestamp_col] return event_timestamp_series.min(), event_timestamp_series.max() @@ -99,11 +103,11 @@ class FeatureViewQueryContext: def get_feature_view_query_context( - feature_refs: List[str], - feature_views: List[FeatureView], - registry: BaseRegistry, - project: str, - entity_df_timestamp_range: Tuple[datetime, datetime], + feature_refs: List[str], + feature_views: List[FeatureView], + registry: BaseRegistry, + project: str, + entity_df_timestamp_range: Tuple[datetime, datetime], ) -> List[FeatureViewQueryContext]: """ Build a query context containing all information required to template a BigQuery and @@ -182,12 +186,12 @@ def get_feature_view_query_context( def build_point_in_time_query( - feature_view_query_contexts: List[FeatureViewQueryContext], - left_table_query_string: str, - entity_df_event_timestamp_col: str, - entity_df_columns: KeysView[str], - query_template: str, - full_feature_names: bool = False, + feature_view_query_contexts: List[FeatureViewQueryContext], + left_table_query_string: str, + entity_df_event_timestamp_col: str, + entity_df_columns: KeysView[str], + query_template: str, + full_feature_names: bool = False, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift""" env = Environment(loader=BaseLoader()) @@ -238,7 +242,9 @@ def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore: def get_pyarrow_schema_from_batch_source( - config: RepoConfig, batch_source: DataSource, timestamp_unit: str = "us" + config: RepoConfig, + batch_source: DataSource, + timestamp_unit: str = "us" ) -> Tuple[pa.Schema, List[str]]: """Returns the pyarrow schema and column names for the given batch source.""" column_names_and_types = batch_source.get_table_column_names_and_types(config) @@ -266,3 +272,42 @@ def enclose_in_backticks(value): return [f"`{v}`" for v in value] else: return f"`{value}`" + + +def get_timestamp_filter_sql( + start_date: Optional[Union[datetime, str]] = None, + end_date: Optional[Union[datetime, str]] = None, + timestamp_field: Optional[str] = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, + tz: Optional[timezone] = None, +) -> str: + """ + Returns a SQL WHERE clause using TIMESTAMP('...') wrapping. + - If input is datetime: uses .isoformat() + - If input is str: uses it as-is (user responsibility) + + Example: + WHERE event_timestamp BETWEEN TIMESTAMP('2023-04-01T00:00:00') AND TIMESTAMP('2023-04-02T00:00:00') + """ + def format_value(val: Union[str, datetime]) -> str: + val_str = val + if isinstance(val, datetime): + if tz: + val = val.astimezone(tz) + val_str = val.isoformat() + return f"TIMESTAMP('{val_str}')" + + if start_date: + start_date = format_value(start_date) + if end_date: + end_date = format_value(end_date) + + if start_date and end_date: + return f"WHERE {timestamp_field} BETWEEN {start_date} AND {end_date}" + elif start_date: + return f"WHERE {timestamp_field} >= {start_date}" + elif end_date: + return f"WHERE {timestamp_field} <= {end_date}" + else: + return "" + + diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index ed76f830f3b..b116ab753e4 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -41,6 +41,7 @@ from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql class RedshiftOfflineStoreConfig(FeastConfigBaseModel): @@ -173,13 +174,12 @@ def pull_all_from_table_or_query( ) s3_resource = aws_utils.get_s3_resource(config.offline_store.region) - start_date = start_date.astimezone(tz=timezone.utc) - end_date = end_date.astimezone(tz=timezone.utc) + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) query = f""" SELECT {field_string} FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + WHERE {timestamp_filter} """ return RedshiftRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py index d11fb4673db..2b5911ac557 100644 --- a/sdk/python/feast/infra/offline_stores/remote.py +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -234,8 +234,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, RemoteOfflineStoreConfig) @@ -253,8 +253,8 @@ def pull_all_from_table_or_query( "join_key_columns": join_key_columns, "feature_name_columns": feature_name_columns, "timestamp_field": timestamp_field, - "start_date": start_date.isoformat(), - "end_date": end_date.isoformat(), + "start_date": start_date.isoformat() if start_date else None, + "end_date": end_date.isoformat() if end_date else None, } return RemoteRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 101685cec6f..88bd302d81d 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -62,6 +62,7 @@ String, UnixTimestamp, ) +from infra.offline_stores.offline_utils import get_timestamp_filter_sql try: from snowflake.connector import SnowflakeConnection @@ -229,8 +230,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig) assert isinstance(data_source, SnowflakeSource) @@ -250,13 +251,12 @@ def pull_all_from_table_or_query( with GetSnowflakeConnection(config.offline_store) as conn: snowflake_conn = conn - start_date = start_date.astimezone(tz=timezone.utc) - end_date = end_date.astimezone(tz=timezone.utc) + timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) query = f""" SELECT {field_string} FROM {from_expression} - WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + {timestamp_filter} """ return SnowflakeRetrievalJob( From c75c264f097236b6d8aad661fff5ff4429464485 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 22:57:24 -0700 Subject: [PATCH 02/20] Update offline store pull all API date field optional Signed-off-by: HaoXuAI --- .../feast/infra/offline_stores/bigquery.py | 4 +- .../contrib/athena_offline_store/athena.py | 17 ++- .../couchbase_offline_store/couchbase.py | 10 +- .../postgres_offline_store/postgres.py | 16 ++- .../contrib/spark_offline_store/spark.py | 6 +- .../contrib/trino_offline_store/trino.py | 6 +- sdk/python/feast/infra/offline_stores/dask.py | 4 +- sdk/python/feast/infra/offline_stores/ibis.py | 6 +- .../infra/offline_stores/offline_utils.py | 106 ++++++++++-------- .../feast/infra/offline_stores/redshift.py | 10 +- .../feast/infra/offline_stores/snowflake.py | 6 +- 11 files changed, 113 insertions(+), 78 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index bc61882ef62..36c3e183d2d 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -207,7 +207,9 @@ def pull_all_from_table_or_query( + BigQueryOfflineStore._escape_query_columns(feature_name_columns) + [timestamp_field] ) - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field) + timestamp_filter = get_timestamp_filter_sql( + start_date, end_date, timestamp_field + ) query = f""" SELECT {field_string} FROM {from_expression} diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 04a1ce4cfd4..4999bc9c123 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -18,6 +18,7 @@ import pandas as pd import pyarrow import pyarrow as pa +from infra.offline_stores.offline_utils import get_timestamp_filter_sql from pydantic import StrictStr from feast import OnDemandFeatureView @@ -40,7 +41,6 @@ from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage -from infra.offline_stores.offline_utils import get_timestamp_filter_sql class AthenaOfflineStoreConfig(FeastConfigBaseModel): @@ -148,18 +148,25 @@ def pull_all_from_table_or_query( date_partition_column = data_source.date_partition_column + start_date_str = None if start_date: - start_date = start_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + start_date_str = start_date.astimezone(tz=timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S.%f" + )[:-3] + end_date_str = None if end_date: - end_date = end_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + end_date_str = end_date.astimezone(tz=timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S.%f" + )[:-3] - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field) + timestamp_filter = get_timestamp_filter_sql( + start_date_str, end_date_str, timestamp_field, date_partition_column + ) query = f""" SELECT {field_string} FROM {from_expression} WHERE {timestamp_filter} - {"AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""} """ return AthenaRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 184e38cfe9b..f3fdf3b745a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -42,11 +42,11 @@ from feast.saved_dataset import SavedDatasetStorage from ... import offline_utils +from ...offline_utils import get_timestamp_filter_sql from .couchbase_source import ( CouchbaseColumnarSource, SavedDatasetCouchbaseColumnarStorage, ) -from ...offline_utils import get_timestamp_filter_sql # Only prints out runtime warnings once. warnings.simplefilter("once", RuntimeWarning) @@ -229,8 +229,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: """ Fetch all rows from the specified table or query within the time range. @@ -251,9 +251,7 @@ def pull_all_from_table_or_query( start_date_normalized = normalize_timestamp(start_date) if start_date else None end_date_normalized = normalize_timestamp(end_date) if end_date else None timestamp_filter = get_timestamp_filter_sql( - start_date_normalized, - end_date_normalized, - timestamp_field + start_date_normalized, end_date_normalized, timestamp_field ) query = f""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 3ecb4ada1cb..0a75ddc1913 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -46,8 +46,8 @@ from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pg_type_code_to_arrow -from .postgres_source import PostgreSQLSource from ...offline_utils import get_timestamp_filter_sql +from .postgres_source import PostgreSQLSource class EntitySelectMode(Enum): @@ -227,8 +227,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig) assert isinstance(data_source, PostgreSQLSource) @@ -238,11 +238,15 @@ def pull_all_from_table_or_query( join_key_columns + feature_name_columns + [timestamp_field] ) + start_date_str = None if start_date: - start_date = f"'{start_date}'::timestamptz" + start_date_str = f"'{start_date}'::timestamptz" + end_date_str = None if end_date: - end_date = f"'{end_date}'::timestamptz" - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) + end_date_str = f"'{end_date}'::timestamptz" + timestamp_filter = get_timestamp_filter_sql( + start_date_str, end_date_str, timestamp_field, tz=timezone.utc + ) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index c9f84ba6b21..fce44cf24e0 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -29,12 +29,12 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import spark_schema_to_np_dtypes from feast.utils import _get_fields_with_aliases -from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql # Make sure spark warning doesn't raise more than once. warnings.simplefilter("once", RuntimeWarning) @@ -297,7 +297,9 @@ def pull_all_from_table_or_query( fields_with_alias_string = ", ".join(fields_with_aliases) from_expression = data_source.get_table_query_string() - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) + timestamp_filter = get_timestamp_filter_sql( + start_date, end_date, timestamp_field, tz=timezone.utc + ) query = f""" SELECT {fields_with_alias_string} diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 75832a63193..6e4b84d8ddc 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -31,11 +31,11 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage -from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql class BasicAuthModel(FeastConfigBaseModel): @@ -418,7 +418,9 @@ def pull_all_from_table_or_query( join_key_columns + feature_name_columns + [timestamp_field] ) - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field) + timestamp_filter = get_timestamp_filter_sql( + start_date, end_date, timestamp_field + ) query = f""" SELECT {field_string} FROM {from_expression} diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 58d3057ff78..64c6b2dfc05 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -363,7 +363,9 @@ def evaluate_offline_job(): # Which is inconsistent with other offline stores. if start_date or end_date: if start_date and end_date: - source_df = source_df[source_df[timestamp_field].between(start_date, end_date)] + source_df = source_df[ + source_df[timestamp_field].between(start_date, end_date) + ] elif start_date: source_df = source_df[source_df[timestamp_field] >= start_date] elif end_date: diff --git a/sdk/python/feast/infra/offline_stores/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py index 7f3df9776ee..fe4113808d9 100644 --- a/sdk/python/feast/infra/offline_stores/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -268,8 +268,10 @@ def pull_all_from_table_or_query_ibis( staging_location_endpoint_override: Optional[str] = None, ) -> RetrievalJob: fields = join_key_columns + feature_name_columns + [timestamp_field] - start_date = start_date.astimezone(tz=timezone.utc) - end_date = end_date.astimezone(tz=timezone.utc) + if start_date: + start_date = start_date.astimezone(tz=timezone.utc) + if end_date: + end_date = end_date.astimezone(tz=timezone.utc) table = data_source_reader(data_source, str(config.repo_path)) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index f9ec7dbb7df..b23632f35b9 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -1,7 +1,6 @@ import uuid from dataclasses import asdict, dataclass -from datetime import datetime, timedelta -from time import timezone +from datetime import datetime, timedelta, timezone from typing import Any, Dict, KeysView, List, Optional, Set, Tuple, Union import numpy as np @@ -46,9 +45,9 @@ def infer_event_timestamp_from_entity_df(entity_schema: Dict[str, np.dtype]) -> def assert_expected_columns_in_entity_df( - entity_schema: Dict[str, np.dtype], - join_keys: Set[str], - entity_df_event_timestamp_col: str, + entity_schema: Dict[str, np.dtype], + join_keys: Set[str], + entity_df_event_timestamp_col: str, ): entity_columns = set(entity_schema.keys()) expected_columns = join_keys | {entity_df_event_timestamp_col} @@ -60,9 +59,7 @@ def assert_expected_columns_in_entity_df( # TODO: Remove project and registry from the interface and call sites. def get_expected_join_keys( - project: str, - feature_views: List[FeatureView], - registry: BaseRegistry + project: str, feature_views: List[FeatureView], registry: BaseRegistry ) -> Set[str]: join_keys = set() for feature_view in feature_views: @@ -75,8 +72,7 @@ def get_expected_join_keys( def get_entity_df_timestamp_bounds( - entity_df: pd.DataFrame, - event_timestamp_col: str + entity_df: pd.DataFrame, event_timestamp_col: str ) -> Tuple[Timestamp, Timestamp]: event_timestamp_series = entity_df[event_timestamp_col] return event_timestamp_series.min(), event_timestamp_series.max() @@ -103,11 +99,11 @@ class FeatureViewQueryContext: def get_feature_view_query_context( - feature_refs: List[str], - feature_views: List[FeatureView], - registry: BaseRegistry, - project: str, - entity_df_timestamp_range: Tuple[datetime, datetime], + feature_refs: List[str], + feature_views: List[FeatureView], + registry: BaseRegistry, + project: str, + entity_df_timestamp_range: Tuple[datetime, datetime], ) -> List[FeatureViewQueryContext]: """ Build a query context containing all information required to template a BigQuery and @@ -186,12 +182,12 @@ def get_feature_view_query_context( def build_point_in_time_query( - feature_view_query_contexts: List[FeatureViewQueryContext], - left_table_query_string: str, - entity_df_event_timestamp_col: str, - entity_df_columns: KeysView[str], - query_template: str, - full_feature_names: bool = False, + feature_view_query_contexts: List[FeatureViewQueryContext], + left_table_query_string: str, + entity_df_event_timestamp_col: str, + entity_df_columns: KeysView[str], + query_template: str, + full_feature_names: bool = False, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift""" env = Environment(loader=BaseLoader()) @@ -242,9 +238,7 @@ def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore: def get_pyarrow_schema_from_batch_source( - config: RepoConfig, - batch_source: DataSource, - timestamp_unit: str = "us" + config: RepoConfig, batch_source: DataSource, timestamp_unit: str = "us" ) -> Tuple[pa.Schema, List[str]]: """Returns the pyarrow schema and column names for the given batch source.""" column_names_and_types = batch_source.get_table_column_names_and_types(config) @@ -278,36 +272,54 @@ def get_timestamp_filter_sql( start_date: Optional[Union[datetime, str]] = None, end_date: Optional[Union[datetime, str]] = None, timestamp_field: Optional[str] = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, + date_partition_column: Optional[str] = None, tz: Optional[timezone] = None, ) -> str: """ - Returns a SQL WHERE clause using TIMESTAMP('...') wrapping. - - If input is datetime: uses .isoformat() - - If input is str: uses it as-is (user responsibility) + Returns a SQL WHERE clause with timestamp filter and optional date partition pruning. + + - Datetime inputs are converted using .isoformat() or .strftime('%Y-%m-%d') + - String inputs are passed through as-is (assumed to be preformatted) + - Uses TIMESTAMP(...) for timestamp_field + - Adds 'AND ...' conditions for partition column if provided Example: - WHERE event_timestamp BETWEEN TIMESTAMP('2023-04-01T00:00:00') AND TIMESTAMP('2023-04-02T00:00:00') + WHERE event_timestamp BETWEEN TIMESTAMP('...') AND TIMESTAMP('...') + AND ds >= '2023-04-01' AND ds <= '2023-04-05' """ - def format_value(val: Union[str, datetime]) -> str: - val_str = val + + def format_timestamp(val: Union[str, datetime]) -> str: if isinstance(val, datetime): if tz: val = val.astimezone(tz) - val_str = val.isoformat() - return f"TIMESTAMP('{val_str}')" - - if start_date: - start_date = format_value(start_date) - if end_date: - end_date = format_value(end_date) - - if start_date and end_date: - return f"WHERE {timestamp_field} BETWEEN {start_date} AND {end_date}" - elif start_date: - return f"WHERE {timestamp_field} >= {start_date}" - elif end_date: - return f"WHERE {timestamp_field} <= {end_date}" - else: - return "" - + val = val.isoformat() + return f"TIMESTAMP('{val}')" + def format_date(val: Union[str, datetime]) -> str: + if isinstance(val, datetime): + if tz: + val = val.astimezone(tz) + return val.strftime("%Y-%m-%d") + return val # assume already formatted like 'YYYY-MM-DD' + + filters = [] + + # Timestamp filtering + ts_start = format_timestamp(start_date) if start_date else None + ts_end = format_timestamp(end_date) if end_date else None + + if ts_start and ts_end: + filters.append(f"{timestamp_field} BETWEEN {ts_start} AND {ts_end}") + elif ts_start: + filters.append(f"{timestamp_field} >= {ts_start}") + elif ts_end: + filters.append(f"{timestamp_field} <= {ts_end}") + + # Date partition pruning + if date_partition_column: + if start_date: + filters.append(f"{date_partition_column} >= '{format_date(start_date)}'") + if end_date: + filters.append(f"{date_partition_column} <= '{format_date(end_date)}'") + + return "WHERE " + " AND ".join(filters) if filters else "" diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index b116ab753e4..40dc59cfdbe 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -33,6 +33,7 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from feast.infra.offline_stores.redshift_source import ( RedshiftLoggingDestination, SavedDatasetRedshiftStorage, @@ -41,7 +42,6 @@ from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage -from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql class RedshiftOfflineStoreConfig(FeastConfigBaseModel): @@ -158,8 +158,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: datetime, - end_date: datetime, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: assert isinstance(config.offline_store, RedshiftOfflineStoreConfig) assert isinstance(data_source, RedshiftSource) @@ -174,7 +174,9 @@ def pull_all_from_table_or_query( ) s3_resource = aws_utils.get_s3_resource(config.offline_store.region) - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) + timestamp_filter = get_timestamp_filter_sql( + start_date, end_date, timestamp_field, tz=timezone.utc + ) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 88bd302d81d..f1c94207bc7 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -24,6 +24,7 @@ import numpy as np import pandas as pd import pyarrow +from infra.offline_stores.offline_utils import get_timestamp_filter_sql from pydantic import ConfigDict, Field, StrictStr from feast import OnDemandFeatureView @@ -62,7 +63,6 @@ String, UnixTimestamp, ) -from infra.offline_stores.offline_utils import get_timestamp_filter_sql try: from snowflake.connector import SnowflakeConnection @@ -251,7 +251,9 @@ def pull_all_from_table_or_query( with GetSnowflakeConnection(config.offline_store) as conn: snowflake_conn = conn - timestamp_filter = get_timestamp_filter_sql(start_date, end_date, timestamp_field, timezone.utc) + timestamp_filter = get_timestamp_filter_sql( + start_date, end_date, timestamp_field, tz=timezone.utc + ) query = f""" SELECT {field_string} From b62fd1a8be2cb8d613d36b11167b0e553bd38871 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 23:10:42 -0700 Subject: [PATCH 03/20] update Signed-off-by: HaoXuAI --- .../compute_engines/spark/feature_builder.py | 14 +-- .../feast/infra/compute_engines/spark/node.py | 95 ++++--------------- .../infra/offline_stores/offline_store.py | 4 +- 3 files changed, 23 insertions(+), 90 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py index 453cee7fda5..3ff90329169 100644 --- a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py @@ -9,11 +9,9 @@ SparkAggregationNode, SparkDedupNode, SparkFilterNode, - SparkHistoricalRetrievalReadNode, SparkJoinNode, - SparkMaterializationReadNode, SparkTransformationNode, - SparkWriteNode, + SparkWriteNode, SparkReadNode, ) @@ -27,12 +25,10 @@ def __init__( self.spark_session = spark_session def build_source_node(self): - if isinstance(self.task, MaterializationTask): - node = SparkMaterializationReadNode("source", self.task) - else: - node = SparkHistoricalRetrievalReadNode( - "source", self.task, self.spark_session - ) + source = self.feature_view.batch_source + start_time = self.task.start_time + end_time = self.task.end_time + node = SparkReadNode("source", source, start_time, end_time) self.nodes.append(node) return node diff --git a/sdk/python/feast/infra/compute_engines/spark/node.py b/sdk/python/feast/infra/compute_engines/spark/node.py index e0215081bcf..93ad034b5fc 100644 --- a/sdk/python/feast/infra/compute_engines/spark/node.py +++ b/sdk/python/feast/infra/compute_engines/spark/node.py @@ -4,6 +4,7 @@ from pyspark.sql import DataFrame, SparkSession, Window from pyspark.sql import functions as F +from data_source import DataSource from feast import BatchFeatureView, StreamFeatureView from feast.aggregation import Aggregation from feast.infra.common.materialization_job import MaterializationTask @@ -49,18 +50,21 @@ def rename_entity_ts_column( return entity_df -class SparkMaterializationReadNode(DAGNode): +class SparkReadNode(DAGNode): def __init__( - self, name: str, task: Union[MaterializationTask, HistoricalRetrievalTask] + self, + name: str, + source: DataSource, + start_time: Optional[timedelta] = None, + end_time: Optional[timedelta] = None, ): super().__init__(name) - self.task = task + self.source = source + self.start_time = start_time + self.end_time = end_time def execute(self, context: ExecutionContext) -> DAGValue: offline_store = context.offline_store - start_time = self.task.start_time - end_time = self.task.end_time - ( join_key_columns, feature_name_columns, @@ -69,15 +73,14 @@ def execute(self, context: ExecutionContext) -> DAGValue: ) = context.column_info # ๐Ÿ“ฅ Reuse Feast's robust query resolver - retrieval_job = offline_store.pull_latest_from_table_or_query( + retrieval_job = offline_store.pull_all_from_table_or_query( config=context.repo_config, - data_source=self.task.feature_view.batch_source, + data_source=self.source, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, - created_timestamp_column=created_timestamp_column, - start_date=start_time, - end_date=end_time, + start_date=self.start_time, + end_date=self.end_time, ) spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df() @@ -88,74 +91,8 @@ def execute(self, context: ExecutionContext) -> DAGValue: "source": "feature_view_batch_source", "timestamp_field": timestamp_field, "created_timestamp_column": created_timestamp_column, - "start_date": start_time, - "end_date": end_time, - }, - ) - - -class SparkHistoricalRetrievalReadNode(DAGNode): - def __init__( - self, name: str, task: HistoricalRetrievalTask, spark_session: SparkSession - ): - super().__init__(name) - self.task = task - self.spark_session = spark_session - - def execute(self, context: ExecutionContext) -> DAGValue: - """ - Read data from the offline store on the Spark engine. - TODO: Some functionality is duplicated with SparkMaterializationReadNode and spark get_historical_features. - Args: - context: SparkExecutionContext - Returns: DAGValue - """ - fv = self.task.feature_view - source = fv.batch_source - - ( - join_key_columns, - feature_name_columns, - timestamp_field, - created_timestamp_column, - ) = context.column_info - - # TODO: Use pull_all_from_table_or_query when it supports not filtering by timestamp - # retrieval_job = offline_store.pull_all_from_table_or_query( - # config=context.repo_config, - # data_source=source, - # join_key_columns=join_key_columns, - # feature_name_columns=feature_name_columns, - # timestamp_field=timestamp_field, - # start_date=min_ts, - # end_date=max_ts, - # ) - # spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df() - - columns = join_key_columns + feature_name_columns + [timestamp_field] - if created_timestamp_column: - columns.append(created_timestamp_column) - - (fields_with_aliases, aliases) = _get_fields_with_aliases( - fields=columns, - field_mappings=source.field_mapping, - ) - fields_with_alias_string = ", ".join(fields_with_aliases) - - from_expression = source.get_table_query_string() - - query = f""" - SELECT {fields_with_alias_string} - FROM {from_expression} - """ - spark_df = self.spark_session.sql(query) - - return DAGValue( - data=spark_df, - format=DAGFormat.SPARK, - metadata={ - "source": "feature_view_batch_source", - "timestamp_field": timestamp_field, + "start_date": self.start_time, + "end_date": self.end_time, }, ) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 09ab978c446..7523413693c 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -294,8 +294,8 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, - start_date: Optional[datetime], - end_date: Optional[datetime], + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: """ Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and From 8b5bdaed621605f9bb30fe7796c93f9b7db660a8 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 23:15:50 -0700 Subject: [PATCH 04/20] update Signed-off-by: HaoXuAI --- sdk/python/feast/infra/common/retrieval_task.py | 6 +++--- .../compute_engines/spark/feature_builder.py | 3 ++- .../feast/infra/compute_engines/spark/node.py | 15 ++++++--------- .../feast/infra/offline_stores/snowflake.py | 2 +- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/common/retrieval_task.py b/sdk/python/feast/infra/common/retrieval_task.py index a5b5583b3ce..960e0d34c49 100644 --- a/sdk/python/feast/infra/common/retrieval_task.py +++ b/sdk/python/feast/infra/common/retrieval_task.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime -from typing import Union +from typing import Optional, Union import pandas as pd @@ -15,5 +15,5 @@ class HistoricalRetrievalTask: feature_view: Union[BatchFeatureView, StreamFeatureView] full_feature_name: bool registry: Registry - start_time: datetime - end_time: datetime + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None diff --git a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py index 3ff90329169..944feccf903 100644 --- a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py @@ -10,8 +10,9 @@ SparkDedupNode, SparkFilterNode, SparkJoinNode, + SparkReadNode, SparkTransformationNode, - SparkWriteNode, SparkReadNode, + SparkWriteNode, ) diff --git a/sdk/python/feast/infra/compute_engines/spark/node.py b/sdk/python/feast/infra/compute_engines/spark/node.py index 93ad034b5fc..1754f90769f 100644 --- a/sdk/python/feast/infra/compute_engines/spark/node.py +++ b/sdk/python/feast/infra/compute_engines/spark/node.py @@ -1,14 +1,12 @@ -from datetime import timedelta +from datetime import datetime, timedelta from typing import List, Optional, Union, cast from pyspark.sql import DataFrame, SparkSession, Window from pyspark.sql import functions as F -from data_source import DataSource from feast import BatchFeatureView, StreamFeatureView from feast.aggregation import Aggregation -from feast.infra.common.materialization_job import MaterializationTask -from feast.infra.common.retrieval_task import HistoricalRetrievalTask +from feast.data_source import DataSource from feast.infra.compute_engines.dag.context import ExecutionContext from feast.infra.compute_engines.dag.model import DAGFormat from feast.infra.compute_engines.dag.node import DAGNode @@ -24,7 +22,6 @@ from feast.infra.offline_stores.offline_utils import ( infer_event_timestamp_from_entity_df, ) -from feast.utils import _get_fields_with_aliases ENTITY_TS_ALIAS = "__entity_event_timestamp" @@ -53,10 +50,10 @@ def rename_entity_ts_column( class SparkReadNode(DAGNode): def __init__( self, - name: str, - source: DataSource, - start_time: Optional[timedelta] = None, - end_time: Optional[timedelta] = None, + name: str, + source: DataSource, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, ): super().__init__(name) self.source = source diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index f1c94207bc7..04166244204 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -24,7 +24,6 @@ import numpy as np import pandas as pd import pyarrow -from infra.offline_stores.offline_utils import get_timestamp_filter_sql from pydantic import ConfigDict, Field, StrictStr from feast import OnDemandFeatureView @@ -38,6 +37,7 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from feast.infra.offline_stores.snowflake_source import ( SavedDatasetSnowflakeStorage, SnowflakeLoggingDestination, From e92d01549f60e64691881ea922e20acef7ac8ce1 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 23:29:19 -0700 Subject: [PATCH 05/20] update Signed-off-by: HaoXuAI --- .../infra/compute_engines/feature_builder.py | 6 +- .../compute_engines/local/feature_builder.py | 23 ------- .../feast/infra/compute_engines/spark/node.py | 7 +- .../postgres_offline_store/postgres.py | 12 ++-- .../infra/offline_stores/offline_utils.py | 67 +++++++++++-------- 5 files changed, 54 insertions(+), 61 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/feature_builder.py b/sdk/python/feast/infra/compute_engines/feature_builder.py index 324f82e7500..ceed3e2d4f3 100644 --- a/sdk/python/feast/infra/compute_engines/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/feature_builder.py @@ -72,10 +72,10 @@ def _should_validate(self): def build(self) -> ExecutionPlan: last_node = self.build_source_node() - # PIT join entities to the feature data, and perform filtering - if isinstance(self.task, HistoricalRetrievalTask): - last_node = self.build_join_node(last_node) + # Join entity_df with source if needed + last_node = self.build_join_node(last_node) + # PIT filter, TTL, and user-defined filter last_node = self.build_filter_node(last_node) if self._should_aggregate(): diff --git a/sdk/python/feast/infra/compute_engines/local/feature_builder.py b/sdk/python/feast/infra/compute_engines/local/feature_builder.py index bf755ed96d0..aee245da21c 100644 --- a/sdk/python/feast/infra/compute_engines/local/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/local/feature_builder.py @@ -2,7 +2,6 @@ from feast.infra.common.materialization_job import MaterializationTask from feast.infra.common.retrieval_task import HistoricalRetrievalTask -from feast.infra.compute_engines.dag.plan import ExecutionPlan from feast.infra.compute_engines.feature_builder import FeatureBuilder from feast.infra.compute_engines.local.backends.base import DataFrameBackend from feast.infra.compute_engines.local.nodes import ( @@ -95,25 +94,3 @@ def build_output_nodes(self, input_node): node = LocalOutputNode("output") node.add_input(input_node) self.nodes.append(node) - - def build(self) -> ExecutionPlan: - last_node = self.build_source_node() - - if isinstance(self.task, HistoricalRetrievalTask): - last_node = self.build_join_node(last_node) - - last_node = self.build_filter_node(last_node) - - if self._should_aggregate(): - last_node = self.build_aggregation_node(last_node) - elif isinstance(self.task, HistoricalRetrievalTask): - last_node = self.build_dedup_node(last_node) - - if self._should_transform(): - last_node = self.build_transformation_node(last_node) - - if self._should_validate(): - last_node = self.build_validation_node(last_node) - - self.build_output_nodes(last_node) - return ExecutionPlan(self.nodes) diff --git a/sdk/python/feast/infra/compute_engines/spark/node.py b/sdk/python/feast/infra/compute_engines/spark/node.py index 1754f90769f..6307d3a1655 100644 --- a/sdk/python/feast/infra/compute_engines/spark/node.py +++ b/sdk/python/feast/infra/compute_engines/spark/node.py @@ -161,7 +161,12 @@ def execute(self, context: ExecutionContext) -> DAGValue: feature_df: DataFrame = feature_value.data entity_df = context.entity_df - assert entity_df is not None, "entity_df must be set in ExecutionContext" + if not entity_df: + return DAGValue( + data=feature_df, + format=DAGFormat.SPARK, + metadata={"joined_on": None}, + ) # Get timestamp fields from feature view join_keys, feature_cols, ts_col, created_ts_col = context.column_info diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 0a75ddc1913..b351532afe4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -238,14 +238,12 @@ def pull_all_from_table_or_query( join_key_columns + feature_name_columns + [timestamp_field] ) - start_date_str = None - if start_date: - start_date_str = f"'{start_date}'::timestamptz" - end_date_str = None - if end_date: - end_date_str = f"'{end_date}'::timestamptz" timestamp_filter = get_timestamp_filter_sql( - start_date_str, end_date_str, timestamp_field, tz=timezone.utc + start_date, + end_date, + timestamp_field, + tz=timezone.utc, + cast_style="timestamptz", ) query = f""" diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index b23632f35b9..d74aff412e6 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -1,7 +1,7 @@ import uuid from dataclasses import asdict, dataclass from datetime import datetime, timedelta, timezone -from typing import Any, Dict, KeysView, List, Optional, Set, Tuple, Union +from typing import Any, Dict, KeysView, List, Literal, Optional, Set, Tuple, Union import numpy as np import pandas as pd @@ -274,52 +274,65 @@ def get_timestamp_filter_sql( timestamp_field: Optional[str] = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, date_partition_column: Optional[str] = None, tz: Optional[timezone] = None, + cast_style: Literal["timestamp_func", "timestamptz", "raw"] = "timestamp_func", ) -> str: """ - Returns a SQL WHERE clause with timestamp filter and optional date partition pruning. - - - Datetime inputs are converted using .isoformat() or .strftime('%Y-%m-%d') - - String inputs are passed through as-is (assumed to be preformatted) - - Uses TIMESTAMP(...) for timestamp_field - - Adds 'AND ...' conditions for partition column if provided - - Example: - WHERE event_timestamp BETWEEN TIMESTAMP('...') AND TIMESTAMP('...') - AND ds >= '2023-04-01' AND ds <= '2023-04-05' + Returns SQL filter condition (no WHERE) with flexible timestamp casting. + + Args: + start_date: datetime or ISO8601 strings + end_date: datetime or ISO8601 strings + timestamp_field: main timestamp column + date_partition_column: optional partition column (for pruning) + tz: optional timezone for datetime inputs + cast_style: one of: + - "timestamp_func": TIMESTAMP('...') โ†’ Snowflake, BigQuery, Athena + - "timestamptz": '...'::timestamptz โ†’ PostgreSQL + - "raw": '...' โ†’ no cast, string only + + Returns: + SQL filter string without WHERE """ - def format_timestamp(val: Union[str, datetime]) -> str: + def format_casted_ts(val: Union[str, datetime]) -> str: if isinstance(val, datetime): if tz: val = val.astimezone(tz) - val = val.isoformat() - return f"TIMESTAMP('{val}')" + val_str = val.isoformat() + else: + val_str = val + + if cast_style == "timestamp_func": + return f"TIMESTAMP('{val_str}')" + elif cast_style == "timestamptz": + return f"'{val_str}'::timestamptz" + else: # raw + return f"'{val_str}'" def format_date(val: Union[str, datetime]) -> str: if isinstance(val, datetime): if tz: val = val.astimezone(tz) return val.strftime("%Y-%m-%d") - return val # assume already formatted like 'YYYY-MM-DD' + return val filters = [] - # Timestamp filtering - ts_start = format_timestamp(start_date) if start_date else None - ts_end = format_timestamp(end_date) if end_date else None - - if ts_start and ts_end: - filters.append(f"{timestamp_field} BETWEEN {ts_start} AND {ts_end}") - elif ts_start: - filters.append(f"{timestamp_field} >= {ts_start}") - elif ts_end: - filters.append(f"{timestamp_field} <= {ts_end}") + # Timestamp filters + if start_date and end_date: + filters.append( + f"{timestamp_field} BETWEEN {format_casted_ts(start_date)} AND {format_casted_ts(end_date)}" + ) + elif start_date: + filters.append(f"{timestamp_field} >= {format_casted_ts(start_date)}") + elif end_date: + filters.append(f"{timestamp_field} <= {format_casted_ts(end_date)}") - # Date partition pruning + # Partition pruning if date_partition_column: if start_date: filters.append(f"{date_partition_column} >= '{format_date(start_date)}'") if end_date: filters.append(f"{date_partition_column} <= '{format_date(end_date)}'") - return "WHERE " + " AND ".join(filters) if filters else "" + return " AND ".join(filters) if filters else "" From 4765d8fd544bf475b71c536b3e72e6734bd9aec8 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 23:35:16 -0700 Subject: [PATCH 06/20] fix test Signed-off-by: HaoXuAI --- .../feast/infra/offline_stores/offline_utils.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index d74aff412e6..2802dcb3579 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -298,16 +298,12 @@ def format_casted_ts(val: Union[str, datetime]) -> str: if isinstance(val, datetime): if tz: val = val.astimezone(tz) - val_str = val.isoformat() - else: - val_str = val - if cast_style == "timestamp_func": - return f"TIMESTAMP('{val_str}')" + return f"TIMESTAMP('{val}')" elif cast_style == "timestamptz": - return f"'{val_str}'::timestamptz" - else: # raw - return f"'{val_str}'" + return f"'{val}'::timestamptz" + else: + return f"'{val}'" def format_date(val: Union[str, datetime]) -> str: if isinstance(val, datetime): @@ -321,7 +317,7 @@ def format_date(val: Union[str, datetime]) -> str: # Timestamp filters if start_date and end_date: filters.append( - f"{timestamp_field} BETWEEN {format_casted_ts(start_date)} AND {format_casted_ts(end_date)}" + f'"{timestamp_field}" BETWEEN {format_casted_ts(start_date)} AND {format_casted_ts(end_date)}' ) elif start_date: filters.append(f"{timestamp_field} >= {format_casted_ts(start_date)}") From 0bbdbbca087b9f9fc04d3db59d0e7b75ccd0dbd2 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 23:40:12 -0700 Subject: [PATCH 07/20] update source read node Signed-off-by: HaoXuAI --- .../infra/compute_engines/local/nodes.py | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index 4e1d2c3362f..8494a796b1e 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -1,7 +1,8 @@ -from datetime import timedelta +from datetime import datetime, timedelta from typing import Optional import pyarrow as pa +from data_source import DataSource from feast.infra.compute_engines.dag.context import ExecutionContext from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue @@ -15,14 +16,39 @@ class LocalSourceReadNode(LocalNode): - def __init__(self, name: str, feature_view, task): + def __init__( + self, + name: str, + source: DataSource, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + ): super().__init__(name) - self.feature_view = feature_view - self.task = task + self.source = source + self.start_time = start_time + self.end_time = end_time def execute(self, context: ExecutionContext) -> ArrowTableValue: - # TODO : Implement the logic to read from offline store - return ArrowTableValue(data=pa.Table.from_pandas(context.entity_df)) + offline_store = context.offline_store + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = context.column_info + + # ๐Ÿ“ฅ Reuse Feast's robust query resolver + retrieval_job = offline_store.pull_all_from_table_or_query( + config=context.repo_config, + data_source=self.source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + start_date=self.start_time, + end_date=self.end_time, + ) + arrow_table = retrieval_job.to_arrow() + return ArrowTableValue(data=arrow_table) class LocalJoinNode(LocalNode): From 033631877eb90e14193c148038a44ed5fbd3f4c4 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Apr 2025 23:51:56 -0700 Subject: [PATCH 08/20] fix linting Signed-off-by: HaoXuAI --- sdk/python/feast/infra/compute_engines/local/nodes.py | 2 +- .../infra/offline_stores/contrib/athena_offline_store/athena.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index 8494a796b1e..b87e09567d0 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -2,8 +2,8 @@ from typing import Optional import pyarrow as pa -from data_source import DataSource +from feast.data_source import DataSource from feast.infra.compute_engines.dag.context import ExecutionContext from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue from feast.infra.compute_engines.local.backends.base import DataFrameBackend diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 4999bc9c123..14aac0a4153 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -18,7 +18,6 @@ import pandas as pd import pyarrow import pyarrow as pa -from infra.offline_stores.offline_utils import get_timestamp_filter_sql from pydantic import StrictStr from feast import OnDemandFeatureView @@ -37,6 +36,7 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel, RepoConfig From ac03a750a2390244fd02a48c7ffad3beca65b614 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 00:30:17 -0700 Subject: [PATCH 09/20] fix linting Signed-off-by: HaoXuAI --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index b351532afe4..d729b01410e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -46,7 +46,7 @@ from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pg_type_code_to_arrow -from ...offline_utils import get_timestamp_filter_sql +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from .postgres_source import PostgreSQLSource From 0033b9c004aba5ae5a4ecaf7c1c3bd1fda273518 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 00:30:33 -0700 Subject: [PATCH 10/20] fix linting Signed-off-by: HaoXuAI --- .../offline_stores/contrib/postgres_offline_store/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index d729b01410e..613076c7fcb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -34,6 +34,7 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils.postgres.connection_utils import ( _get_conn, @@ -46,7 +47,6 @@ from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pg_type_code_to_arrow -from feast.infra.offline_stores.offline_utils import get_timestamp_filter_sql from .postgres_source import PostgreSQLSource From 743a490a45e153a9456395a826aa7f36e1b7e314 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 00:44:57 -0700 Subject: [PATCH 11/20] fix linting Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/dask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 64c6b2dfc05..0f161a54782 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -364,7 +364,7 @@ def evaluate_offline_job(): if start_date or end_date: if start_date and end_date: source_df = source_df[ - source_df[timestamp_field].between(start_date, end_date) + source_df[timestamp_field].between(start_date, end_date, inclusive="left") ] elif start_date: source_df = source_df[source_df[timestamp_field] >= start_date] From 4285b5fbba49a0e84e79a76569807930ad33aae8 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 00:45:14 -0700 Subject: [PATCH 12/20] fix linting Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/dask.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 0f161a54782..5fe99585193 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -364,7 +364,9 @@ def evaluate_offline_job(): if start_date or end_date: if start_date and end_date: source_df = source_df[ - source_df[timestamp_field].between(start_date, end_date, inclusive="left") + source_df[timestamp_field].between( + start_date, end_date, inclusive="left" + ) ] elif start_date: source_df = source_df[source_df[timestamp_field] >= start_date] From 618c4b8218a3e489092f8ab73ec0f72799e6ef63 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 00:55:00 -0700 Subject: [PATCH 13/20] fix test Signed-off-by: HaoXuAI --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 2 +- sdk/python/feast/infra/offline_stores/snowflake.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index fce44cf24e0..bb4a04cbd5c 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -304,7 +304,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {fields_with_alias_string} FROM {from_expression} - {timestamp_filter} + WHERE {timestamp_filter} """ return SparkRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 04166244204..72a57e38b8c 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -258,7 +258,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - {timestamp_filter} + WHERE {timestamp_filter} """ return SnowflakeRetrievalJob( From afcaf9cf55e4db14515ddf337a3dfded4d3be4c0 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 00:56:21 -0700 Subject: [PATCH 14/20] fix test Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 36c3e183d2d..0545267737e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -213,7 +213,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - {timestamp_filter} + WHERE {timestamp_filter} """ return BigQueryRetrievalJob( query=query, From 8e02747c4c60db1a68312cef540d3d17c0986585 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 11:57:24 -0700 Subject: [PATCH 15/20] fix test Signed-off-by: HaoXuAI --- .../feast/infra/compute_engines/spark/node.py | 2 +- .../feast/infra/offline_stores/bigquery.py | 2 +- .../contrib/athena_offline_store/athena.py | 7 +++- .../couchbase_offline_store/couchbase.py | 13 +++++--- .../postgres_offline_store/postgres.py | 1 + .../contrib/spark_offline_store/spark.py | 2 +- .../contrib/trino_offline_store/trino.py | 2 +- .../infra/offline_stores/offline_utils.py | 33 ++++++++++++++----- 8 files changed, 45 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/node.py b/sdk/python/feast/infra/compute_engines/spark/node.py index 6307d3a1655..799e4991491 100644 --- a/sdk/python/feast/infra/compute_engines/spark/node.py +++ b/sdk/python/feast/infra/compute_engines/spark/node.py @@ -161,7 +161,7 @@ def execute(self, context: ExecutionContext) -> DAGValue: feature_df: DataFrame = feature_value.data entity_df = context.entity_df - if not entity_df: + if entity_df is None: return DAGValue( data=feature_df, format=DAGFormat.SPARK, diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 0545267737e..ead1e7390b8 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -208,7 +208,7 @@ def pull_all_from_table_or_query( + [timestamp_field] ) timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field + start_date, end_date, timestamp_field, quote_fields=False ) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 14aac0a4153..54a8da4114e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -160,7 +160,12 @@ def pull_all_from_table_or_query( )[:-3] timestamp_filter = get_timestamp_filter_sql( - start_date_str, end_date_str, timestamp_field, date_partition_column + start_date_str, + end_date_str, + timestamp_field, + date_partition_column, + cast_style="raw", + quote_fields=False, ) query = f""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index f3fdf3b745a..60c00b39ada 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -247,11 +247,16 @@ def pull_all_from_table_or_query( field_string = ", ".join( join_key_columns + feature_name_columns + [timestamp_field] ) - - start_date_normalized = normalize_timestamp(start_date) if start_date else None - end_date_normalized = normalize_timestamp(end_date) if end_date else None + start_date_normalized = ( + f"`{normalize_timestamp(start_date)}`" if start_date else None + ) + end_date_normalized = f"`{normalize_timestamp(end_date)}`" if end_date else None timestamp_filter = get_timestamp_filter_sql( - start_date_normalized, end_date_normalized, timestamp_field + start_date_normalized, + end_date_normalized, + timestamp_field, + cast_style="raw", + quote_fields=False, ) query = f""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 613076c7fcb..7352952ea50 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -244,6 +244,7 @@ def pull_all_from_table_or_query( timestamp_field, tz=timezone.utc, cast_style="timestamptz", + date_time_separator=" ", # backwards compatibility but inconsistent with other offline stores ) query = f""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index bb4a04cbd5c..3629a7ae0cc 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -298,7 +298,7 @@ def pull_all_from_table_or_query( from_expression = data_source.get_table_query_string() timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field, tz=timezone.utc + start_date, end_date, timestamp_field, tz=timezone.utc, quote_fields=False ) query = f""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 6e4b84d8ddc..13cb8f076bd 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -419,7 +419,7 @@ def pull_all_from_table_or_query( ) timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field + start_date, end_date, timestamp_field, quote_fields=False ) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 2802dcb3579..1628a26ec79 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -275,6 +275,8 @@ def get_timestamp_filter_sql( date_partition_column: Optional[str] = None, tz: Optional[timezone] = None, cast_style: Literal["timestamp_func", "timestamptz", "raw"] = "timestamp_func", + date_time_separator: str = "T", + quote_fields: bool = True, ) -> str: """ Returns SQL filter condition (no WHERE) with flexible timestamp casting. @@ -289,21 +291,33 @@ def get_timestamp_filter_sql( - "timestamp_func": TIMESTAMP('...') โ†’ Snowflake, BigQuery, Athena - "timestamptz": '...'::timestamptz โ†’ PostgreSQL - "raw": '...' โ†’ no cast, string only + date_time_separator: separator for datetime strings (default is "T") + (e.g. "2023-10-01T00:00:00" or "2023-10-01 00:00:00") + quote_fields: whether to quote the timestamp and partition column names Returns: SQL filter string without WHERE """ + def quote_column_if_needed(column: Optional[str]) -> Optional[str]: + if not column or not quote_fields: + return column + return f'"{column}"' + def format_casted_ts(val: Union[str, datetime]) -> str: if isinstance(val, datetime): if tz: val = val.astimezone(tz) + val_str = val.isoformat(sep=date_time_separator) + else: + val_str = val + if cast_style == "timestamp_func": - return f"TIMESTAMP('{val}')" + return f"TIMESTAMP('{val_str}')" elif cast_style == "timestamptz": - return f"'{val}'::timestamptz" + return f"'{val_str}'::timestamptz" else: - return f"'{val}'" + return f"'{val_str}'" def format_date(val: Union[str, datetime]) -> str: if isinstance(val, datetime): @@ -312,23 +326,26 @@ def format_date(val: Union[str, datetime]) -> str: return val.strftime("%Y-%m-%d") return val + ts_field = quote_column_if_needed(timestamp_field) + dp_field = quote_column_if_needed(date_partition_column) + filters = [] # Timestamp filters if start_date and end_date: filters.append( - f'"{timestamp_field}" BETWEEN {format_casted_ts(start_date)} AND {format_casted_ts(end_date)}' + f"{ts_field} BETWEEN {format_casted_ts(start_date)} AND {format_casted_ts(end_date)}" ) elif start_date: - filters.append(f"{timestamp_field} >= {format_casted_ts(start_date)}") + filters.append(f"{ts_field} >= {format_casted_ts(start_date)}") elif end_date: - filters.append(f"{timestamp_field} <= {format_casted_ts(end_date)}") + filters.append(f"{ts_field} <= {format_casted_ts(end_date)}") # Partition pruning if date_partition_column: if start_date: - filters.append(f"{date_partition_column} >= '{format_date(start_date)}'") + filters.append(f"{dp_field} >= '{format_date(start_date)}'") if end_date: - filters.append(f"{date_partition_column} <= '{format_date(end_date)}'") + filters.append(f"{dp_field} <= '{format_date(end_date)}'") return " AND ".join(filters) if filters else "" From 71daec23b4671d5a75b6e4b63a5afe31f7b08425 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 12:13:50 -0700 Subject: [PATCH 16/20] fix test Signed-off-by: HaoXuAI --- .../feast/infra/compute_engines/spark/node.py | 14 +++++++------- .../compute_engines/spark/test_compute.py | 2 -- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/node.py b/sdk/python/feast/infra/compute_engines/spark/node.py index 799e4991491..be6048bc2e1 100644 --- a/sdk/python/feast/infra/compute_engines/spark/node.py +++ b/sdk/python/feast/infra/compute_engines/spark/node.py @@ -211,13 +211,13 @@ def execute(self, context: ExecutionContext) -> DAGValue: if ENTITY_TS_ALIAS in input_df.columns: filtered_df = filtered_df.filter(F.col(ts_col) <= F.col(ENTITY_TS_ALIAS)) - # Optional TTL filter: feature.ts >= entity.event_timestamp - ttl - if self.ttl: - ttl_seconds = int(self.ttl.total_seconds()) - lower_bound = F.col(ENTITY_TS_ALIAS) - F.expr( - f"INTERVAL {ttl_seconds} seconds" - ) - filtered_df = filtered_df.filter(F.col(ts_col) >= lower_bound) + # Optional TTL filter: feature.ts >= entity.event_timestamp - ttl + if self.ttl: + ttl_seconds = int(self.ttl.total_seconds()) + lower_bound = F.col(ENTITY_TS_ALIAS) - F.expr( + f"INTERVAL {ttl_seconds} seconds" + ) + filtered_df = filtered_df.filter(F.col(ts_col) >= lower_bound) # Optional custom filter condition if self.filter_condition: diff --git a/sdk/python/tests/integration/compute_engines/spark/test_compute.py b/sdk/python/tests/integration/compute_engines/spark/test_compute.py index c6aef9e5701..5bad326f102 100644 --- a/sdk/python/tests/integration/compute_engines/spark/test_compute.py +++ b/sdk/python/tests/integration/compute_engines/spark/test_compute.py @@ -161,8 +161,6 @@ def transform_feature(df: DataFrame) -> DataFrame: feature_view=driver_stats_fv, full_feature_name=False, registry=registry, - start_time=now - timedelta(days=1), - end_time=now, ) # ๐Ÿงช Run SparkComputeEngine From 23a0fb41be08afa06313aa2468885f4917500922 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 13:38:22 -0700 Subject: [PATCH 17/20] fix test Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/offline_utils.py | 6 +++--- sdk/python/feast/infra/offline_stores/redshift.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 1628a26ec79..79c925f9a01 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -274,7 +274,7 @@ def get_timestamp_filter_sql( timestamp_field: Optional[str] = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, date_partition_column: Optional[str] = None, tz: Optional[timezone] = None, - cast_style: Literal["timestamp_func", "timestamptz", "raw"] = "timestamp_func", + cast_style: Literal["timestamp_func", "timestamp", "timestamptz", "raw"] = "timestamp_func", date_time_separator: str = "T", quote_fields: bool = True, ) -> str: @@ -314,8 +314,8 @@ def format_casted_ts(val: Union[str, datetime]) -> str: if cast_style == "timestamp_func": return f"TIMESTAMP('{val_str}')" - elif cast_style == "timestamptz": - return f"'{val_str}'::timestamptz" + elif cast_style in ("timestamptz", "timestamp"): + return f"'{val_str}'::{cast_style}" else: return f"'{val_str}'" diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 40dc59cfdbe..02052856b46 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -175,7 +175,7 @@ def pull_all_from_table_or_query( s3_resource = aws_utils.get_s3_resource(config.offline_store.region) timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field, tz=timezone.utc + start_date, end_date, timestamp_field, tz=timezone.utc, cast_style="timestamp" ) query = f""" From bcf1d7a66f5a274a3d815731bc201b37be8b8d49 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 13:46:19 -0700 Subject: [PATCH 18/20] fix test Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- sdk/python/feast/infra/offline_stores/offline_utils.py | 10 +++++++--- sdk/python/feast/infra/offline_stores/redshift.py | 5 ++++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index ead1e7390b8..cc083e7233f 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -208,7 +208,7 @@ def pull_all_from_table_or_query( + [timestamp_field] ) timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field, quote_fields=False + start_date, end_date, timestamp_field, quote_fields=False, cast_style="timestamp_func" ) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 79c925f9a01..95510487fb2 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -274,7 +274,9 @@ def get_timestamp_filter_sql( timestamp_field: Optional[str] = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, date_partition_column: Optional[str] = None, tz: Optional[timezone] = None, - cast_style: Literal["timestamp_func", "timestamp", "timestamptz", "raw"] = "timestamp_func", + cast_style: Literal[ + "timestamp", "timestamp_func", "timestamptz", "raw" + ] = "timestamp", date_time_separator: str = "T", quote_fields: bool = True, ) -> str: @@ -312,9 +314,11 @@ def format_casted_ts(val: Union[str, datetime]) -> str: else: val_str = val - if cast_style == "timestamp_func": + if cast_style == "timestamp": + return f"TIMESTAMP '{val_str}'" + elif cast_style == "timestamp_func": return f"TIMESTAMP('{val_str}')" - elif cast_style in ("timestamptz", "timestamp"): + elif cast_style == "timestamptz": return f"'{val_str}'::{cast_style}" else: return f"'{val_str}'" diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 02052856b46..162be08da45 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -175,7 +175,10 @@ def pull_all_from_table_or_query( s3_resource = aws_utils.get_s3_resource(config.offline_store.region) timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field, tz=timezone.utc, cast_style="timestamp" + start_date, + end_date, + timestamp_field, + tz=timezone.utc, ) query = f""" From 89ad0da9fbd1295d2a4f38c2568d52faf6f599e6 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 13:48:48 -0700 Subject: [PATCH 19/20] fix test Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/bigquery.py | 6 +++++- sdk/python/feast/infra/offline_stores/offline_utils.py | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index cc083e7233f..e6c2b4e8db7 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -208,7 +208,11 @@ def pull_all_from_table_or_query( + [timestamp_field] ) timestamp_filter = get_timestamp_filter_sql( - start_date, end_date, timestamp_field, quote_fields=False, cast_style="timestamp_func" + start_date, + end_date, + timestamp_field, + quote_fields=False, + cast_style="timestamp_func", ) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 95510487fb2..e951434e2a3 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -290,7 +290,8 @@ def get_timestamp_filter_sql( date_partition_column: optional partition column (for pruning) tz: optional timezone for datetime inputs cast_style: one of: - - "timestamp_func": TIMESTAMP('...') โ†’ Snowflake, BigQuery, Athena + - "timestamp": TIMESTAMP '...' โ†’ Common Sql engine Snowflake, Redshift etc. + - "timestamp_func": TIMESTAMP('...') โ†’ BigQuery, Couchbase etc. - "timestamptz": '...'::timestamptz โ†’ PostgreSQL - "raw": '...' โ†’ no cast, string only date_time_separator: separator for datetime strings (default is "T") From cdae95b080c14c466c5ad22d2422fa28fd4c7bf3 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Fri, 18 Apr 2025 16:46:02 -0700 Subject: [PATCH 20/20] fix test Signed-off-by: HaoXuAI --- .../feast/infra/compute_engines/local/nodes.py | 1 + .../feast/infra/compute_engines/spark/node.py | 1 + .../feast/infra/offline_stores/bigquery.py | 7 ++++++- .../contrib/athena_offline_store/athena.py | 6 +++++- .../contrib/couchbase_offline_store/couchbase.py | 6 +++++- .../contrib/mssql_offline_store/mssql.py | 2 ++ .../contrib/postgres_offline_store/postgres.py | 6 +++++- .../contrib/spark_offline_store/spark.py | 7 ++++++- .../contrib/trino_offline_store/trino.py | 7 ++++++- sdk/python/feast/infra/offline_stores/dask.py | 3 ++- sdk/python/feast/infra/offline_stores/duckdb.py | 2 ++ sdk/python/feast/infra/offline_stores/ibis.py | 6 +++++- .../feast/infra/offline_stores/offline_store.py | 4 +++- .../feast/infra/offline_stores/redshift.py | 6 +++++- sdk/python/feast/infra/offline_stores/remote.py | 2 ++ .../feast/infra/offline_stores/snowflake.py | 6 +++++- sdk/python/feast/offline_server.py | 16 +++++++++------- .../compute_engines/spark/test_compute.py | 2 +- 18 files changed, 71 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index b87e09567d0..aea83921351 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -44,6 +44,7 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue: join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, start_date=self.start_time, end_date=self.end_time, ) diff --git a/sdk/python/feast/infra/compute_engines/spark/node.py b/sdk/python/feast/infra/compute_engines/spark/node.py index be6048bc2e1..0c1c1476613 100644 --- a/sdk/python/feast/infra/compute_engines/spark/node.py +++ b/sdk/python/feast/infra/compute_engines/spark/node.py @@ -76,6 +76,7 @@ def execute(self, context: ExecutionContext) -> DAGValue: join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, start_date=self.start_time, end_date=self.end_time, ) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index e6c2b4e8db7..2c4bc8cdbc3 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -189,6 +189,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -202,10 +203,14 @@ def pull_all_from_table_or_query( project=project_id, location=config.offline_store.location, ) + + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ", ".join( BigQueryOfflineStore._escape_query_columns(join_key_columns) + BigQueryOfflineStore._escape_query_columns(feature_name_columns) - + [timestamp_field] + + timestamp_fields ) timestamp_filter = get_timestamp_filter_sql( start_date, diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 54a8da4114e..6f92f45793b 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -132,6 +132,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -139,8 +140,11 @@ def pull_all_from_table_or_query( assert isinstance(data_source, AthenaSource) from_expression = data_source.get_table_query_string(config) + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ", ".join( - join_key_columns + feature_name_columns + [timestamp_field] + join_key_columns + feature_name_columns + timestamp_fields ) athena_client = aws_utils.get_athena_data_client(config.offline_store.region) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 60c00b39ada..54921e9515e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -229,6 +229,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -244,8 +245,11 @@ def pull_all_from_table_or_query( assert isinstance(data_source, CouchbaseColumnarSource) from_expression = data_source.get_table_query_string() + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ", ".join( - join_key_columns + feature_name_columns + [timestamp_field] + join_key_columns + feature_name_columns + timestamp_fields ) start_date_normalized = ( f"`{normalize_timestamp(start_date)}`" if start_date else None diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index ff07bfb478c..4821aa8dcb6 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -177,6 +177,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -186,6 +187,7 @@ def pull_all_from_table_or_query( join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, start_date=start_date, end_date=end_date, data_source_reader=_build_data_source_reader(config), diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 7352952ea50..1a75bb7e178 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -227,6 +227,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -234,8 +235,11 @@ def pull_all_from_table_or_query( assert isinstance(data_source, PostgreSQLSource) from_expression = data_source.get_table_query_string() + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ", ".join( - join_key_columns + feature_name_columns + [timestamp_field] + join_key_columns + feature_name_columns + timestamp_fields ) timestamp_filter = get_timestamp_filter_sql( diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 3629a7ae0cc..806610cae7e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -270,6 +270,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -289,8 +290,12 @@ def pull_all_from_table_or_query( spark_session = get_spark_session_or_start_new_with_repoconfig( store_config=config.offline_store ) + + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) (fields_with_aliases, aliases) = _get_fields_with_aliases( - fields=join_key_columns + feature_name_columns + [timestamp_field], + fields=join_key_columns + feature_name_columns + timestamp_fields, field_mappings=data_source.field_mapping, ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 13cb8f076bd..7f7b91d1d23 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -406,6 +406,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -414,8 +415,12 @@ def pull_all_from_table_or_query( from_expression = data_source.get_table_query_string() client = _get_trino_client(config=config) + + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ", ".join( - join_key_columns + feature_name_columns + [timestamp_field] + join_key_columns + feature_name_columns + timestamp_fields ) timestamp_filter = get_timestamp_filter_sql( diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 5fe99585193..ea857996966 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -402,6 +402,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -415,7 +416,7 @@ def pull_all_from_table_or_query( + [timestamp_field], # avoid deduplication feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, - created_timestamp_column=None, + created_timestamp_column=created_timestamp_column, start_date=start_date, end_date=end_date, ) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 29c27d32a0c..7bf96129d0b 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -179,6 +179,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -188,6 +189,7 @@ def pull_all_from_table_or_query( join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, start_date=start_date, end_date=end_date, data_source_reader=_read_data_source, diff --git a/sdk/python/feast/infra/offline_stores/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py index fe4113808d9..95c5afef2db 100644 --- a/sdk/python/feast/infra/offline_stores/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -262,12 +262,16 @@ def pull_all_from_table_or_query_ibis( timestamp_field: str, data_source_reader: Callable[[DataSource, str], Table], data_source_writer: Callable[[pyarrow.Table, DataSource, str], None], + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, staging_location: Optional[str] = None, staging_location_endpoint_override: Optional[str] = None, ) -> RetrievalJob: - fields = join_key_columns + feature_name_columns + [timestamp_field] + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) + fields = join_key_columns + feature_name_columns + timestamp_fields if start_date: start_date = start_date.astimezone(tz=timezone.utc) if end_date: diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 7523413693c..73794f67a17 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -294,6 +294,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -309,7 +310,8 @@ def pull_all_from_table_or_query( data_source: The data source from which the entity rows will be extracted. join_key_columns: The columns of the join keys. feature_name_columns: The columns of the features. - timestamp_field: The timestamp column. + timestamp_field: The timestamp column, used to determine which rows are the most recent. + created_timestamp_column (Optional): The column indicating when the row was created, used to break ties. start_date (Optional): The start of the time range. end_date (Optional): The end of the time range. diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 162be08da45..4ed8e6309c4 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -158,6 +158,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -165,8 +166,11 @@ def pull_all_from_table_or_query( assert isinstance(data_source, RedshiftSource) from_expression = data_source.get_table_query_string() + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ", ".join( - join_key_columns + feature_name_columns + [timestamp_field] + join_key_columns + feature_name_columns + timestamp_fields ) redshift_client = aws_utils.get_redshift_data_client( diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py index 2b5911ac557..41985b9bba0 100644 --- a/sdk/python/feast/infra/offline_stores/remote.py +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -234,6 +234,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -253,6 +254,7 @@ def pull_all_from_table_or_query( "join_key_columns": join_key_columns, "feature_name_columns": feature_name_columns, "timestamp_field": timestamp_field, + "created_timestamp_column": created_timestamp_column, "start_date": start_date.isoformat() if start_date else None, "end_date": end_date.isoformat() if end_date else None, } diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 72a57e38b8c..3a39d0ea6db 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -230,6 +230,7 @@ def pull_all_from_table_or_query( join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, + created_timestamp_column: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> RetrievalJob: @@ -242,9 +243,12 @@ def pull_all_from_table_or_query( if not data_source.database and data_source.schema and data_source.table: from_expression = f'"{config.offline_store.database}".{from_expression}' + timestamp_fields = [timestamp_field] + if created_timestamp_column: + timestamp_fields.append(created_timestamp_column) field_string = ( '"' - + '", "'.join(join_key_columns + feature_name_columns + [timestamp_field]) + + '", "'.join(join_key_columns + feature_name_columns + timestamp_fields) + '"' ) diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py index f3642e5812e..f3215ca0e47 100644 --- a/sdk/python/feast/offline_server.py +++ b/sdk/python/feast/offline_server.py @@ -354,13 +354,15 @@ def pull_all_from_table_or_query(self, command: dict): assert_permissions(data_source, actions=[AuthzedAction.READ_OFFLINE]) return self.offline_store.pull_all_from_table_or_query( - self.store.config, - data_source, - command["join_key_columns"], - command["feature_name_columns"], - command["timestamp_field"], - utils.make_tzaware(datetime.fromisoformat(command["start_date"])), - utils.make_tzaware(datetime.fromisoformat(command["end_date"])), + config=self.store.config, + data_source=data_source, + join_key_columns=command["join_key_columns"], + feature_name_columns=command["feature_name_columns"], + timestamp_field=command["timestamp_field"], + start_date=utils.make_tzaware( + datetime.fromisoformat(command["start_date"]) + ), + end_date=utils.make_tzaware(datetime.fromisoformat(command["end_date"])), ) def _validate_pull_latest_from_table_or_query_parameters(self, command: dict): diff --git a/sdk/python/tests/integration/compute_engines/spark/test_compute.py b/sdk/python/tests/integration/compute_engines/spark/test_compute.py index 5bad326f102..15b6e850c65 100644 --- a/sdk/python/tests/integration/compute_engines/spark/test_compute.py +++ b/sdk/python/tests/integration/compute_engines/spark/test_compute.py @@ -226,7 +226,7 @@ def tqdm_builder(length): task = MaterializationTask( project=spark_environment.project, feature_view=driver_stats_fv, - start_time=now - timedelta(days=1), + start_time=now - timedelta(days=2), end_time=now, tqdm_builder=tqdm_builder, )