From c738a8e72f2655e91966861d401656c3cf451e8e Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Sun, 13 Jul 2025 10:36:00 -0700 Subject: [PATCH 1/7] add hybrid offline store Signed-off-by: HaoXuAI --- .../offline_stores/hybrid_offline_store.py | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 sdk/python/feast/infra/offline_stores/hybrid_offline_store.py diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py new file mode 100644 index 00000000000..1201c0e72b2 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py @@ -0,0 +1,158 @@ +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Any, Union, Callable, Iterable, Tuple + +import pandas as pd +import pyarrow + +from feast import FeatureView, RepoConfig +from feast.data_source import DataSource +from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob +from feast.infra.offline_stores.offline_utils import get_offline_store_from_config +from feast.repo_config import FeastConfigBaseModel, get_offline_config_from_type + +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from feature_logging import LoggingSource, LoggingConfig +from infra.registry.base_registry import BaseRegistry + + +class HybridOfflineStoreConfig(FeastConfigBaseModel): + type: str = "hybrid_offline_store.HybridOfflineStore" + + class OfflineStoresWithConfig(FeastConfigBaseModel): + type: str + conf: Dict[str, Any] + + offline_stores: Optional[List[OfflineStoresWithConfig]] + + +class HybridOfflineStore(OfflineStore): + _instance: Optional["HybridOfflineStore"] = None + + _source_to_store_key = { + FileSource: "file", + SnowflakeSource: "snowflake", + LoggingSource: "logging", + + } + + def __new__(cls): + if cls._instance is None: + cls._instance = super(HybridOfflineStore, cls).__new__(cls) + cls._instance._initialized = False + cls._instance.offline_stores = {} + return cls._instance + + def _initialize_offline_stores(self, + config: RepoConfig): + if self._initialized: + return + for store_cfg in getattr(config.offline_store, "offline_stores", []): + config_cls = get_offline_config_from_type(store_cfg.type.split(".")[-1].lower()) + config_instance = config_cls(**store_cfg.conf) + store = get_offline_store_from_config(config_instance) + store_key = store_cfg.type.split(".")[-1].replace("OfflineStore", "").lower() + self.offline_stores[store_key] = store + self._initialized = True + + def _get_offline_store_for_feature_view(self, + feature_view: FeatureView, + config: RepoConfig) -> OfflineStore: + self._initialize_offline_stores(config) + source_type = type(feature_view.batch_source) + store_key = self._source_to_store_key.get(source_type) + if store_key is None: + raise ValueError(f"Unsupported FeatureView batch_source type: {source_type}") + return self.offline_stores[store_key] + + def _get_offline_store_for_source(self, + data_source: Union[DataSource, LoggingSource], + config: RepoConfig) -> OfflineStore: + self._initialize_offline_stores(config) + source_type = type(data_source) + store_key = self._source_to_store_key.get(source_type) + if store_key is None: + raise ValueError(f"Unsupported DataSource type: {source_type}") + return self.offline_stores[store_key] + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + store = HybridOfflineStore()._get_offline_store_for_feature_view(feature_views[0], config) + return store.get_historical_features(config, feature_views, feature_refs, entity_df, registry, project, + full_feature_names) + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + store = HybridOfflineStore()._get_offline_store_for_source(data_source, config) + return store.pull_latest_from_table_or_query(config, data_source, join_key_columns, feature_name_columns, + timestamp_field, created_timestamp_column, start_date, end_date) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + store = HybridOfflineStore()._get_offline_store_for_source(data_source, config) + return store.pull_all_from_table_or_query(config, data_source, join_key_columns, feature_name_columns, + timestamp_field, created_timestamp_column, start_date, end_date) + + @staticmethod + def write_logged_features( + config: RepoConfig, + data: Union[pyarrow.Table, Path], + source: LoggingSource, + logging_config: LoggingConfig, + registry: BaseRegistry, + ): + store = HybridOfflineStore()._get_offline_store_for_source(source, config) + return store.write_logged_features(config, data, source, logging_config, registry) + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): + store = HybridOfflineStore()._get_offline_store_for_feature_view(feature_view, config) + return store.offline_write_batch(config, feature_view, table, progress) + + def validate_data_source( + self, + config: RepoConfig, + data_source: DataSource, + ): + store = self._get_offline_store_for_source(data_source, config) + return store.validate_data_source(config, data_source) + + def get_table_column_names_and_types_from_data_source( + self, + config: RepoConfig, + data_source: DataSource, + ) -> Iterable[Tuple[str, str]]: + store = self._get_offline_store_for_source(data_source, config) + return store.get_table_column_names_and_types_from_data_source(config, data_source) From b765b5e80d281527fedaaa5806f9b051851f7cb1 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Sun, 13 Jul 2025 10:37:33 -0700 Subject: [PATCH 2/7] add hybrid offline store Signed-off-by: HaoXuAI --- .../offline_stores/hybrid_offline_store.py | 168 +++++++++++------- 1 file changed, 102 insertions(+), 66 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py index 1201c0e72b2..d1a0e871b26 100644 --- a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py @@ -1,20 +1,19 @@ from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Any, Union, Callable, Iterable, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd import pyarrow +from feature_logging import LoggingConfig, LoggingSource +from infra.registry.base_registry import BaseRegistry from feast import FeatureView, RepoConfig from feast.data_source import DataSource +from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config -from feast.repo_config import FeastConfigBaseModel, get_offline_config_from_type - -from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource -from feature_logging import LoggingSource, LoggingConfig -from infra.registry.base_registry import BaseRegistry +from feast.repo_config import FeastConfigBaseModel, get_offline_config_from_type class HybridOfflineStoreConfig(FeastConfigBaseModel): @@ -29,12 +28,13 @@ class OfflineStoresWithConfig(FeastConfigBaseModel): class HybridOfflineStore(OfflineStore): _instance: Optional["HybridOfflineStore"] = None + _initialized: bool + offline_stores: Dict[str, OfflineStore] _source_to_store_key = { FileSource: "file", SnowflakeSource: "snowflake", LoggingSource: "logging", - } def __new__(cls): @@ -44,31 +44,36 @@ def __new__(cls): cls._instance.offline_stores = {} return cls._instance - def _initialize_offline_stores(self, - config: RepoConfig): + def _initialize_offline_stores(self, config: RepoConfig): if self._initialized: return for store_cfg in getattr(config.offline_store, "offline_stores", []): - config_cls = get_offline_config_from_type(store_cfg.type.split(".")[-1].lower()) + config_cls = get_offline_config_from_type( + store_cfg.type.split(".")[-1].lower() + ) config_instance = config_cls(**store_cfg.conf) store = get_offline_store_from_config(config_instance) - store_key = store_cfg.type.split(".")[-1].replace("OfflineStore", "").lower() + store_key = ( + store_cfg.type.split(".")[-1].replace("OfflineStore", "").lower() + ) self.offline_stores[store_key] = store self._initialized = True - def _get_offline_store_for_feature_view(self, - feature_view: FeatureView, - config: RepoConfig) -> OfflineStore: + def _get_offline_store_for_feature_view( + self, feature_view: FeatureView, config: RepoConfig + ) -> OfflineStore: self._initialize_offline_stores(config) source_type = type(feature_view.batch_source) store_key = self._source_to_store_key.get(source_type) if store_key is None: - raise ValueError(f"Unsupported FeatureView batch_source type: {source_type}") + raise ValueError( + f"Unsupported FeatureView batch_source type: {source_type}" + ) return self.offline_stores[store_key] - def _get_offline_store_for_source(self, - data_source: Union[DataSource, LoggingSource], - config: RepoConfig) -> OfflineStore: + def _get_offline_store_for_source( + self, data_source: Union[DataSource, LoggingSource], config: RepoConfig + ) -> OfflineStore: self._initialize_offline_stores(config) source_type = type(data_source) store_key = self._source_to_store_key.get(source_type) @@ -78,81 +83,112 @@ def _get_offline_store_for_source(self, @staticmethod def get_historical_features( - config: RepoConfig, - feature_views: List[FeatureView], - feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], - registry: BaseRegistry, - project: str, - full_feature_names: bool = False, + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, ) -> RetrievalJob: - store = HybridOfflineStore()._get_offline_store_for_feature_view(feature_views[0], config) - return store.get_historical_features(config, feature_views, feature_refs, entity_df, registry, project, - full_feature_names) + store = HybridOfflineStore()._get_offline_store_for_feature_view( + feature_views[0], config + ) + return store.get_historical_features( + config, + feature_views, + feature_refs, + entity_df, + registry, + project, + full_feature_names, + ) @staticmethod def pull_latest_from_table_or_query( - config: RepoConfig, - data_source: DataSource, - join_key_columns: List[str], - feature_name_columns: List[str], - timestamp_field: str, - created_timestamp_column: Optional[str], - start_date: datetime, - end_date: datetime, + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, ) -> RetrievalJob: store = HybridOfflineStore()._get_offline_store_for_source(data_source, config) - return store.pull_latest_from_table_or_query(config, data_source, join_key_columns, feature_name_columns, - timestamp_field, created_timestamp_column, start_date, end_date) + return store.pull_latest_from_table_or_query( + config, + data_source, + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + start_date, + end_date, + ) @staticmethod def pull_all_from_table_or_query( - config: RepoConfig, - data_source: DataSource, - join_key_columns: List[str], - feature_name_columns: List[str], - timestamp_field: str, - created_timestamp_column: Optional[str] = None, - start_date: Optional[datetime] = None, - end_date: Optional[datetime] = None, + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: store = HybridOfflineStore()._get_offline_store_for_source(data_source, config) - return store.pull_all_from_table_or_query(config, data_source, join_key_columns, feature_name_columns, - timestamp_field, created_timestamp_column, start_date, end_date) + return store.pull_all_from_table_or_query( + config, + data_source, + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + start_date, + end_date, + ) @staticmethod def write_logged_features( - config: RepoConfig, - data: Union[pyarrow.Table, Path], - source: LoggingSource, - logging_config: LoggingConfig, - registry: BaseRegistry, + config: RepoConfig, + data: Union[pyarrow.Table, Path], + source: LoggingSource, + logging_config: LoggingConfig, + registry: BaseRegistry, ): store = HybridOfflineStore()._get_offline_store_for_source(source, config) - return store.write_logged_features(config, data, source, logging_config, registry) + return store.write_logged_features( + config, data, source, logging_config, registry + ) @staticmethod def offline_write_batch( - config: RepoConfig, - feature_view: FeatureView, - table: pyarrow.Table, - progress: Optional[Callable[[int], Any]], + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], ): - store = HybridOfflineStore()._get_offline_store_for_feature_view(feature_view, config) + store = HybridOfflineStore()._get_offline_store_for_feature_view( + feature_view, config + ) return store.offline_write_batch(config, feature_view, table, progress) def validate_data_source( - self, - config: RepoConfig, - data_source: DataSource, + self, + config: RepoConfig, + data_source: DataSource, ): store = self._get_offline_store_for_source(data_source, config) return store.validate_data_source(config, data_source) def get_table_column_names_and_types_from_data_source( - self, - config: RepoConfig, - data_source: DataSource, + self, + config: RepoConfig, + data_source: DataSource, ) -> Iterable[Tuple[str, str]]: store = self._get_offline_store_for_source(data_source, config) - return store.get_table_column_names_and_types_from_data_source(config, data_source) + return store.get_table_column_names_and_types_from_data_source( + config, data_source + ) From fd695e82d796d56750682ca40565f6960c9f0912 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Sun, 13 Jul 2025 23:51:42 -0700 Subject: [PATCH 3/7] fix source type Signed-off-by: HaoXuAI --- sdk/python/feast/data_source.py | 25 ++++++ .../infra/offline_stores/bigquery_source.py | 3 + .../athena_offline_store/athena_source.py | 3 + .../clickhouse_source.py | 4 + .../couchbase_source.py | 4 + .../mssql_offline_store/mssqlserver_source.py | 4 + .../postgres_offline_store/postgres_source.py | 4 + .../spark_offline_store/spark_source.py | 3 + .../trino_offline_store/trino_source.py | 3 + .../feast/infra/offline_stores/file_source.py | 3 + .../offline_stores/hybrid_offline_store.py | 67 +++++++++------- .../infra/offline_stores/redshift_source.py | 3 + .../infra/offline_stores/snowflake_source.py | 6 ++ sdk/python/feast/repo_config.py | 9 ++- .../test_hybrid_offline_store.py | 0 .../test_hybrid_offline_store.py | 80 +++++++++++++++++++ 16 files changed, 190 insertions(+), 31 deletions(-) create mode 100644 sdk/python/tests/integration/offline_store/test_hybrid_offline_store.py create mode 100644 sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 53f3115a1b2..7a185818a76 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -163,6 +163,16 @@ def to_proto(self) -> DataSourceProto.KinesisOptions: DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource", } +_DATA_SOURCE_FOR_OFFLINE_STORE = { + DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.dask.DaskOfflineStore", + DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery.BigQueryOfflineStore", + DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift.RedshiftOfflineStore", + DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore", + DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore", + DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore", + DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore", +} + @typechecked class DataSource(ABC): @@ -401,6 +411,9 @@ def _set_timestamps_in_proto(self, data_source_proto: DataSourceProto): self.last_updated_timestamp ) + @abstractmethod + def source_type(self) -> DataSourceProto.SourceType.ValueType: ... + @typechecked class KafkaSource(DataSource): @@ -564,6 +577,9 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: def get_table_query_string(self) -> str: raise NotImplementedError + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.STREAM_KAFKA + @typechecked class RequestSource(DataSource): @@ -679,6 +695,9 @@ def get_table_query_string(self) -> str: def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: raise NotImplementedError + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.REQUEST_SOURCE + @typechecked class KinesisSource(DataSource): @@ -811,6 +830,9 @@ def _to_proto_impl(self) -> DataSourceProto: return data_source_proto + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.STREAM_KINESIS + class PushMode(enum.Enum): ONLINE = 1 @@ -911,3 +933,6 @@ def get_table_query_string(self) -> str: @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: raise NotImplementedError + + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.PUSH_SOURCE diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 4a8b237e1d6..1dda7af928f 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -23,6 +23,9 @@ class BigQuerySource(DataSource): """A BigQuerySource object defines a data source that a BigQueryOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.BATCH_BIGQUERY + def __init__( self, *, diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py index 12a78e9624a..157ca35933f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py @@ -17,6 +17,9 @@ class AthenaSource(DataSource): + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.BATCH_ATHENA + def __init__( self, *, diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py index f376a2088ba..e3c4a5f6fda 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py @@ -56,6 +56,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions: class ClickhouseSource(DataSource): + def source_type(self) -> DataSourceProto.SourceType.ValueType: + # TODO: Add ClickhouseSourceType to DataSourceProto + return DataSourceProto.CUSTOM_SOURCE + def __init__( self, name: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py index 1f554a8110f..10b9863daed 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py @@ -26,6 +26,10 @@ class CouchbaseColumnarSource(DataSource): """A CouchbaseColumnarSource object defines a data source that a CouchbaseColumnarOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + # TODO: Add Couchbase to DataSourceProto.SourceType + return DataSourceProto.CUSTOM_SOURCE + def __init__( self, name: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssqlserver_source.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssqlserver_source.py index 5d6103df7ab..ce1107730eb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssqlserver_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssqlserver_source.py @@ -113,6 +113,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions: class MsSqlServerSource(DataSource): """A MsSqlServerSource object defines a data source that a MsSqlServerOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + # TODO: Add MsSqlServerSource to DataSourceProto.SourceType + return DataSourceProto.CUSTOM_SOURCE + def __init__( self, name: str, diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py index 49a9c3360c6..0d3045e4aa7 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py @@ -20,6 +20,10 @@ class PostgreSQLSource(DataSource): """A PostgreSQLSource object defines a data source that a PostgreSQLOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + # TODO: Add Postgres to DataSourceProto.SourceType + return DataSourceProto.CUSTOM_SOURCE + def __init__( self, name: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index e7dec7a1cd4..bd4fb1ac817 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -31,6 +31,9 @@ class SparkSourceFormat(Enum): class SparkSource(DataSource): """A SparkSource object defines a data source that a Spark offline store can use""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.BATCH_SPARK + def __init__( self, *, diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py index 233a86de616..d8768b773fb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py @@ -86,6 +86,9 @@ def to_proto(self) -> DataSourceProto.TrinoOptions: class TrinoSource(DataSource): """A TrinoSource object defines a data source that a TrinoOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.BATCH_TRINO + def __init__( self, *, diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index af3b21d937a..12524ab4fc2 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -29,6 +29,9 @@ class FileSource(DataSource): """A FileSource object defines a data source that a DaskOfflineStore or DuckDBOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.BATCH_FILE + def __init__( self, *, diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py index d1a0e871b26..74621346745 100644 --- a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py @@ -4,16 +4,20 @@ import pandas as pd import pyarrow -from feature_logging import LoggingConfig, LoggingSource -from infra.registry.base_registry import BaseRegistry from feast import FeatureView, RepoConfig -from feast.data_source import DataSource -from feast.infra.offline_stores.file_source import FileSource +from feast.data_source import _DATA_SOURCE_FOR_OFFLINE_STORE, DataSource +from feast.errors import FeastOfflineStoreInvalidName +from feast.feature_logging import LoggingConfig, LoggingSource from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config -from feast.infra.offline_stores.snowflake_source import SnowflakeSource -from feast.repo_config import FeastConfigBaseModel, get_offline_config_from_type +from feast.infra.registry.base_registry import BaseRegistry +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.repo_config import ( + FeastConfigBaseModel, + get_offline_config_from_type, + get_offline_store_type, +) class HybridOfflineStoreConfig(FeastConfigBaseModel): @@ -31,12 +35,6 @@ class HybridOfflineStore(OfflineStore): _initialized: bool offline_stores: Dict[str, OfflineStore] - _source_to_store_key = { - FileSource: "file", - SnowflakeSource: "snowflake", - LoggingSource: "logging", - } - def __new__(cls): if cls._instance is None: cls._instance = super(HybridOfflineStore, cls).__new__(cls) @@ -48,23 +46,34 @@ def _initialize_offline_stores(self, config: RepoConfig): if self._initialized: return for store_cfg in getattr(config.offline_store, "offline_stores", []): - config_cls = get_offline_config_from_type( - store_cfg.type.split(".")[-1].lower() - ) - config_instance = config_cls(**store_cfg.conf) - store = get_offline_store_from_config(config_instance) - store_key = ( - store_cfg.type.split(".")[-1].replace("OfflineStore", "").lower() - ) - self.offline_stores[store_key] = store + try: + offline_store_type = get_offline_store_type(store_cfg.type) + config_cls = get_offline_config_from_type(store_cfg.type) + config_instance = config_cls(**store_cfg.conf) + store = get_offline_store_from_config(config_instance) + self.offline_stores[offline_store_type] = store + except FeastOfflineStoreInvalidName as e: + raise FeastOfflineStoreInvalidName( + f"Failed to initialize Hybrid offline store {store_cfg.type}: {e}" + ) self._initialized = True + def get_source_key_from_type( + self, source_type: DataSourceProto.SourceType.ValueType + ) -> Optional[str]: + if source_type not in list(_DATA_SOURCE_FOR_OFFLINE_STORE.keys()): + raise ValueError( + f"Unsupported DataSource type for HybridOfflineStore: {source_type}." + f"Supported types are: {list(_DATA_SOURCE_FOR_OFFLINE_STORE.keys())}" + ) + return _DATA_SOURCE_FOR_OFFLINE_STORE.get(source_type, None) + def _get_offline_store_for_feature_view( self, feature_view: FeatureView, config: RepoConfig ) -> OfflineStore: self._initialize_offline_stores(config) - source_type = type(feature_view.batch_source) - store_key = self._source_to_store_key.get(source_type) + source_type = feature_view.batch_source.source_type() + store_key = self.get_source_key_from_type(source_type) if store_key is None: raise ValueError( f"Unsupported FeatureView batch_source type: {source_type}" @@ -72,11 +81,11 @@ def _get_offline_store_for_feature_view( return self.offline_stores[store_key] def _get_offline_store_for_source( - self, data_source: Union[DataSource, LoggingSource], config: RepoConfig + self, data_source: DataSource, config: RepoConfig ) -> OfflineStore: self._initialize_offline_stores(config) - source_type = type(data_source) - store_key = self._source_to_store_key.get(source_type) + source_type = data_source.source_type() + store_key = self.get_source_key_from_type(source_type) if store_key is None: raise ValueError(f"Unsupported DataSource type: {source_type}") return self.offline_stores[store_key] @@ -158,9 +167,9 @@ def write_logged_features( logging_config: LoggingConfig, registry: BaseRegistry, ): - store = HybridOfflineStore()._get_offline_store_for_source(source, config) - return store.write_logged_features( - config, data, source, logging_config, registry + raise NotImplementedError( + "HybridOfflineStore does not support write_logged_features. " + "Please use the specific offline store for logging." ) @staticmethod diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 87a27d1d186..752d3b12cf3 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -26,6 +26,9 @@ class RedshiftSource(DataSource): """A RedshiftSource object defines a data source that a RedshiftOfflineStore class can use.""" + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.BATCH_REDSHIFT + def __init__( self, *, diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 16856774622..ea20ce794b2 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -301,6 +301,12 @@ def get_table_column_names_and_types( for column in metadata ] + def source_type(self) -> DataSourceProto.SourceType.ValueType: + """ + Returns the source type of this data source. + """ + return DataSourceProto.BATCH_SNOWFLAKE + snowflake_type_code_map = { 0: "NUMBER", diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 41d4971ea4e..3e19374494a 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -568,11 +568,16 @@ def get_auth_config_from_type(auth_config_type: str): return import_class(module_name, config_class_name, config_class_name) -def get_offline_config_from_type(offline_store_type: str): +def get_offline_store_type(offline_store_type: str): if offline_store_type in OFFLINE_STORE_CLASS_FOR_TYPE: - offline_store_type = OFFLINE_STORE_CLASS_FOR_TYPE[offline_store_type] + return OFFLINE_STORE_CLASS_FOR_TYPE[offline_store_type] elif not offline_store_type.endswith("OfflineStore"): raise FeastOfflineStoreInvalidName(offline_store_type) + return offline_store_type + + +def get_offline_config_from_type(offline_store_type: str): + offline_store_type = get_offline_store_type(offline_store_type) module_name, offline_store_class_type = offline_store_type.rsplit(".", 1) config_class_name = f"{offline_store_class_type}Config" diff --git a/sdk/python/tests/integration/offline_store/test_hybrid_offline_store.py b/sdk/python/tests/integration/offline_store/test_hybrid_offline_store.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py new file mode 100644 index 00000000000..03479d8b39a --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py @@ -0,0 +1,80 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from feast import Entity +from feast.feature_view import FeatureView +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.hybrid_offline_store import ( + HybridOfflineStore, + HybridOfflineStoreConfig, +) +from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from feast.repo_config import RepoConfig + +driver = Entity(name="driver_id", description="driver id") + + +@pytest.fixture +def mock_repo_config(): + return RepoConfig( + registry="/tmp/registry.db", + project="test_project", + provider="local", + offline_store=HybridOfflineStoreConfig( + offline_stores=[ + HybridOfflineStoreConfig.OfflineStoresWithConfig( + type="file", + conf={}, + ), + HybridOfflineStoreConfig.OfflineStoresWithConfig( + type="snowflake.offline", + conf={"database": "db", "schema": "public"}, + ), + ] + ), + online_store=None, + ) + + +@patch("feast.infra.offline_stores.hybrid_offline_store.get_offline_store_from_config") +def test_file_source_routing(mock_get_offline_store_from_config, mock_repo_config): + HybridOfflineStore._instance = None + + mock_file_store = MagicMock() + mock_snowflake_store = MagicMock() + + def side_effect(conf): + if conf.__class__.__name__ == "DaskOfflineStoreConfig": + return mock_file_store + elif conf.__class__.__name__ == "SnowflakeOfflineStoreConfig": + return mock_snowflake_store + raise ValueError(f"Unexpected config class: {conf.__class__.__name__}") + + mock_get_offline_store_from_config.side_effect = side_effect + + hybrid_store = HybridOfflineStore() + + feature_view_1 = FeatureView( + name="my_feature_1", + entities=[driver], + schema=[], + source=FileSource(path="data.parquet"), + ) + + store1 = hybrid_store._get_offline_store_for_feature_view( + feature_view_1, mock_repo_config + ) + assert store1 is mock_file_store + + feature_view_2 = FeatureView( + name="my_feature_2", + entities=[driver], + schema=[], + source=SnowflakeSource(database="db", schema="public", table="table"), + ) + + store2 = hybrid_store._get_offline_store_for_feature_view( + feature_view_2, mock_repo_config + ) + assert store2 is mock_snowflake_store From 02b2cdbe8d1c590553732c7207068d060e3df4e1 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Mon, 14 Jul 2025 10:55:18 -0700 Subject: [PATCH 4/7] update doc Signed-off-by: HaoXuAI --- .../infra/offline_stores/hybrid_offline_store.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py index 74621346745..7b7ae40cb2c 100644 --- a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py @@ -100,6 +100,21 @@ def get_historical_features( project: str, full_feature_names: bool = False, ) -> RetrievalJob: + # TODO: Multiple data sources can be supported when feature store use compute engine + # for getting historical features + data_source = None + for feature_view in feature_views: + if not feature_view.batch_source: + raise ValueError( + f"HybridOfflineStore only supports feature views with DataSource as source. " + ) + if not data_source: + data_source = feature_view.batch_source + elif data_source != feature_view.batch_source: + raise ValueError( + "All feature views must have the same batch source for HybridOfflineStore." + ) + store = HybridOfflineStore()._get_offline_store_for_feature_view( feature_views[0], config ) From bc379f567428d4c6e724ddf5e233ffb10d8000e6 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Mon, 14 Jul 2025 10:55:39 -0700 Subject: [PATCH 5/7] update doc Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/hybrid_offline_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py index 7b7ae40cb2c..b37877276c9 100644 --- a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py @@ -106,7 +106,7 @@ def get_historical_features( for feature_view in feature_views: if not feature_view.batch_source: raise ValueError( - f"HybridOfflineStore only supports feature views with DataSource as source. " + "HybridOfflineStore only supports feature views with DataSource as source. " ) if not data_source: data_source = feature_view.batch_source From d338885ac62caa28c0dc28d0e1d950c8dfea777f Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Mon, 14 Jul 2025 12:29:14 -0700 Subject: [PATCH 6/7] update doc Signed-off-by: HaoXuAI --- docs/reference/offline-stores/hybrid.md | 94 +++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 docs/reference/offline-stores/hybrid.md diff --git a/docs/reference/offline-stores/hybrid.md b/docs/reference/offline-stores/hybrid.md new file mode 100644 index 00000000000..5fce8cc1939 --- /dev/null +++ b/docs/reference/offline-stores/hybrid.md @@ -0,0 +1,94 @@ +# Hybrid Offline Store + +## Description +The HybridOfflineStore allows routing offline feature operations to different offline store backends based on the `batch_source` of the FeatureView. This enables a single Feast deployment to support multiple offline store backends, each configured independently and selected dynamically at runtime. + +## Getting started +To use the HybridOfflineStore, install Feast with all required offline store dependencies (e.g., BigQuery, Snowflake, etc.) for the stores you plan to use. For example: + +```bash +pip install 'feast[spark,snowflake]' +``` + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +offline_store: + type: hybrid_offline_store.HybridOfflineStore + offline_stores: + - type: spark + conf: + spark_master: local[*] + spark_app_name: feast_spark_app + - type: snowflake + conf: + account: my_snowflake_account + user: feast_user + password: feast_password + database: feast_database + schema: feast_schema +``` +{% endcode %} + +### Example FeatureView with Batch Source +```python +from feast import FeatureView, Entity, ValueType +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( + SparkSource, +) +from feast.infra.offline_stores.snowflake_source import SnowflakeSource + + +entity = Entity(name="user_id", value_type=ValueType.INT64, join_keys=["user_id"]) +feature_view1 = FeatureView( + name="user_features", + entities=["user_id"], + ttl=None, + features=[ + # Define your features here + ], + source=SparkSource( + path="s3://my-bucket/user_features_data", + ), +) + +feature_view2 = FeatureView( + name="user_activity", + entities=["user_id"], + ttl=None, + features=[ + # Define your features here + ], + source=SnowflakeSource( + path="s3://my-bucket/user_activity_data", + ), +) + +``` + +Then you can use materialize API to materialize the data from the specified offline store based on the `batch_source` of the FeatureView. + +```python +from feast import FeatureStore +store = FeatureStore(repo_path=".") +store.materialize( + start_date="2025-01-01", + end_date="2025-07-31", + feature_views=[feature_view1, feature_view2], +) +``` + +## Functionality Matrix +| Feature/Functionality | Supported | +|---------------------------------------------------|----------------------------| +| pull_latest_from_table_or_query | Yes | +| pull_all_from_table_or_query | Yes | +| offline_write_batch | Yes | +| validate_data_source | Yes | +| get_table_column_names_and_types_from_data_source | Yes | +| write_logged_features | No | +| get_historical_features | Only with same data source | From 5af1cf2af6160565b2ec5e6cfca0efa645fc7c70 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Mon, 14 Jul 2025 12:31:54 -0700 Subject: [PATCH 7/7] update doc Signed-off-by: HaoXuAI --- docs/reference/offline-stores/hybrid.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/offline-stores/hybrid.md b/docs/reference/offline-stores/hybrid.md index 5fce8cc1939..a10ed66fd2c 100644 --- a/docs/reference/offline-stores/hybrid.md +++ b/docs/reference/offline-stores/hybrid.md @@ -34,7 +34,7 @@ offline_store: ``` {% endcode %} -### Example FeatureView with Batch Source +### Example FeatureView ```python from feast import FeatureView, Entity, ValueType from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (