From 0346741f7c6572d198ef0c13440a20fda6a966e8 Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Thu, 17 Jul 2025 20:45:14 -0700 Subject: [PATCH] update doc Signed-off-by: HaoXuAI --- docs/getting-started/components/README.md | 4 +- .../batch-materialization-engine.md | 11 -- .../components/compute-engine.md | 111 ++++++++++++++++++ docs/getting-started/concepts/README.md | 8 ++ .../concepts/batch-feature-view.md | 6 + .../concepts/stream-feature-view.md | 16 +++ docs/reference/compute-engine/README.md | 4 +- sdk/python/feast/batch_feature_view.py | 30 ++--- .../feast/infra/compute_engines/base.py | 31 +++++ .../infra/compute_engines/local/README.md | 0 .../infra/compute_engines/spark/compute.py | 32 +++-- .../contrib/spark_offline_store/spark.py | 1 - sdk/python/feast/stream_feature_view.py | 12 +- 13 files changed, 212 insertions(+), 54 deletions(-) delete mode 100644 docs/getting-started/components/batch-materialization-engine.md create mode 100644 docs/getting-started/components/compute-engine.md create mode 100644 docs/getting-started/concepts/stream-feature-view.md create mode 100644 sdk/python/feast/infra/compute_engines/local/README.md diff --git a/docs/getting-started/components/README.md b/docs/getting-started/components/README.md index 1b224056298..b07b5f8389e 100644 --- a/docs/getting-started/components/README.md +++ b/docs/getting-started/components/README.md @@ -16,8 +16,8 @@ [feature-server.md](feature-server.md) {% endcontent-ref %} -{% content-ref url="batch-materialization-engine.md" %} -[batch-materialization-engine.md](batch-materialization-engine.md) +{% content-ref url="compute-engine.md" %} +[compute-engine.md](compute-engine.md) {% endcontent-ref %} {% content-ref url="provider.md" %} diff --git a/docs/getting-started/components/batch-materialization-engine.md b/docs/getting-started/components/batch-materialization-engine.md deleted file mode 100644 index 9a3f7af6c7d..00000000000 --- a/docs/getting-started/components/batch-materialization-engine.md +++ /dev/null @@ -1,11 +0,0 @@ -# Batch Materialization Engine - -Note: The materialization engine is not constructed via unified compute engine interface. - -A batch materialization engine is a component of Feast that's responsible for moving data from the offline store into the online store. - -A materialization engine abstracts over specific technologies or frameworks that are used to materialize data. It allows users to use a pure local serialized approach (which is the default LocalComputeEngine), or delegates the materialization to seperate components (e.g. AWS Lambda, as implemented by the the LambdaComputeEngine). - -If the built-in engines are not sufficient, you can create your own custom materialization engine. Please see [this guide](../../how-to-guides/customizing-feast/creating-a-custom-compute-engine.md) for more details. - -Please see [feature\_store.yaml](../../reference/feature-repository/feature-store-yaml.md#overview) for configuring engines. diff --git a/docs/getting-started/components/compute-engine.md b/docs/getting-started/components/compute-engine.md new file mode 100644 index 00000000000..9dd92fe514c --- /dev/null +++ b/docs/getting-started/components/compute-engine.md @@ -0,0 +1,111 @@ +# Compute Engine (Batch Materialization Engine) + +Note: The materialization is now constructed via unified compute engine interface. + +A Compute Engine in Feast is a component that handles materialization and historical retrieval tasks. It is responsible +for executing the logic defined in feature views, such as aggregations, transformations, and custom user-defined +functions (UDFs). + +A materialization task abstracts over specific technologies or frameworks that are used to materialize data. It allows +users to use a pure local serialized approach (which is the default LocalComputeEngine), or delegates the +materialization to seperate components (e.g. AWS Lambda, as implemented by the the LambdaComputeEngine). + +If the built-in engines are not sufficient, you can create your own custom materialization engine. Please +see [this guide](../../how-to-guides/customizing-feast/creating-a-custom-compute-engine.md) for more details. + +Please see [feature\_store.yaml](../../reference/feature-repository/feature-store-yaml.md#overview) for configuring +engines. + +### Supported Compute Engines +```markdown +| Compute Engine | Description | Supported | Link | +|-------------------------|-------------------------------------------------------------------------------------------------|------------|------| +| LocalComputeEngine | Runs on Arrow + Pandas/Polars/Dask etc., designed for light weight transformation. | ✅ | | +| SparkComputeEngine | Runs on Apache Spark, designed for large-scale distributed feature generation. | ✅ | | +| LambdaComputeEngine | Runs on AWS Lambda, designed for serverless feature generation. | ✅ | | +| FlinkComputeEngine | Runs on Apache Flink, designed for stream processing and real-time feature generation. | ❌ | | +| RayComputeEngine | Runs on Ray, designed for distributed feature generation and machine learning workloads. | ❌ | | +``` + +### Batch Engine +Batch Engine Config can be configured in the `feature_store.yaml` file, and it serves as the default configuration for all materialization and historical retrieval tasks. The `batch_engine` config in BatchFeatureView. E.g +```yaml +batch_engine: + type: SparkComputeEngine + config: + spark_master: "local[*]" + spark_app_name: "Feast Batch Engine" + spark_conf: + spark.sql.shuffle.partitions: 100 + spark.executor.memory: "4g" + +``` +in BatchFeatureView. +```python +from feast import BatchFeatureView + +fv = BatchFeatureView( + batch_engine={ + "spark_conf": { + "spark.sql.shuffle.partitions": 200, + "spark.executor.memory": "8g" + }, + } +) +``` +Then, when you materialize the feature view, it will use the batch_engine configuration specified in the feature view, which has shuffle partitions set to 200 and executor memory set to 8g. + +### Stream Engine +Stream Engine Config can be configured in the `feature_store.yaml` file, and it serves as the default configuration for all stream materialization and historical retrieval tasks. The `stream_engine` config in FeatureView. E.g +```yaml +stream_engine: + type: SparkComputeEngine + config: + spark_master: "local[*]" + spark_app_name: "Feast Stream Engine" + spark_conf: + spark.sql.shuffle.partitions: 100 + spark.executor.memory: "4g" +``` +```python +from feast import StreamFeatureView +fv = StreamFeatureView( + stream_engine={ + "spark_conf": { + "spark.sql.shuffle.partitions": 200, + "spark.executor.memory": "8g" + }, + } +) +``` +Then, when you materialize the feature view, it will use the stream_engine configuration specified in the feature view, which has shuffle partitions set to 200 and executor memory set to 8g. + +### API + +The compute engine builds the execution plan in a DAG format named FeatureBuilder. It derives feature generation from +Feature View definitions including: + +``` +1. Transformation (via Transformation API) +2. Aggregation (via Aggregation API) +3. Join (join with entity datasets, customized JOIN or join with another Feature View) +4. Filter (Point in time filter, ttl filter, filter by custom expression) +... +``` + +### Components +The compute engine is responsible for executing the materialization and retrieval tasks defined in the feature views. It +builds a directed acyclic graph (DAG) of operations that need to be performed to generate the features. +The Core components of the compute engine are: + + +#### Feature Builder + +The Feature builder is responsible for resolving the features from the feature views and executing the operations +defined in the DAG. It handles the execution of transformations, aggregations, joins, and filters. + +#### Feature Resolver + +The Feature resolver is the core component of the compute engine that constructs the execution plan for feature +generation. It takes the definitions from feature views and builds a directed acyclic graph (DAG) of operations that +need to be performed to generate the features. \ No newline at end of file diff --git a/docs/getting-started/concepts/README.md b/docs/getting-started/concepts/README.md index 95e1a14bf1f..13722a8a7da 100644 --- a/docs/getting-started/concepts/README.md +++ b/docs/getting-started/concepts/README.md @@ -20,6 +20,14 @@ [feature-view.md](feature-view.md) {% endcontent-ref %} +{% content-ref url="batch-feature-view.md" %} +[batch-feature-view.md](batch-feature-view.md) +{% endcontent-ref %} + +{% content-ref url="stream-feature-view.md" %} +[stream-feature-view.md](stream-feature-view.md) +{% endcontent-ref %} + {% content-ref url="feature-retrieval.md" %} [feature-retrieval.md](feature-retrieval.md) {% endcontent-ref %} diff --git a/docs/getting-started/concepts/batch-feature-view.md b/docs/getting-started/concepts/batch-feature-view.md index 83a6f5698d7..9d0eb86389c 100644 --- a/docs/getting-started/concepts/batch-feature-view.md +++ b/docs/getting-started/concepts/batch-feature-view.md @@ -2,6 +2,12 @@ `BatchFeatureView` is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other `FeatureView`s, enabling composable and reusable feature pipelines. It is an extension of the `FeatureView` class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic. +## Supported Compute Engines +- [x] LocalComputeEngine +- [x] SparkComputeEngine +- [ ] LambdaComputeEngine +- [ ] KubernetesComputeEngine + --- ## ✅ Key Capabilities diff --git a/docs/getting-started/concepts/stream-feature-view.md b/docs/getting-started/concepts/stream-feature-view.md new file mode 100644 index 00000000000..e9b08473b45 --- /dev/null +++ b/docs/getting-started/concepts/stream-feature-view.md @@ -0,0 +1,16 @@ +# Stream Feature View +`StreamFeatureView` is a type of feature view in Feast that allows you to define features that are continuously updated from a streaming source. It is designed to handle real-time data ingestion and feature generation, making it suitable for use cases where features need to be updated frequently as new data arrives. + +### Supported Compute Engines +- [x] LocalComputeEngine +- [x] SparkComputeEngine +- [ ] FlinkComputeEngine + +### Key Capabilities +- **Real-time Feature Generation**: Supports defining features that are continuously updated from a streaming source. + +- **Transformations**: Apply transformation logic (e.g., `feature_transformation` or `udf`) to raw data source. + +- **Aggregations**: Define time-windowed aggregations (e.g., `sum`, `avg`) over event-timestamped data. + +- **Feature resolution & execution**: Automatically resolves and executes dependent views during materialization or retrieval. diff --git a/docs/reference/compute-engine/README.md b/docs/reference/compute-engine/README.md index 40f7e6c5246..163ce362ccc 100644 --- a/docs/reference/compute-engine/README.md +++ b/docs/reference/compute-engine/README.md @@ -19,8 +19,8 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio | `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) | | `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) | | `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) | -| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) | -| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) | +| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) | +| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) | --- diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index 85a71f01c43..c168aaeec10 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -34,6 +34,7 @@ class BatchFeatureView(FeatureView): Attributes: name: The unique name of the batch feature view. + mode: The transformation mode to use for the batch feature view. This can be one of TransformationMode entities: List of entities or entity join keys. ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 @@ -46,6 +47,13 @@ class BatchFeatureView(FeatureView): description: A human-readable description. tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the batch feature view, typically the email of the primary maintainer. + udf: A user-defined function that applies transformations to the data in the batch feature view. + udf_string: A string representation of the user-defined function. + feature_transformation: A transformation object that defines how features are transformed. + Note, feature_transformation has precedence over udf and udf_string. + batch_engine: A dictionary containing configuration for the batch engine used to process the feature view. + Note, it will override the repo-level default batch engine config defined in the yaml file. + aggregations: A list of aggregations to be applied to the features in the batch feature view. """ name: str @@ -67,7 +75,7 @@ class BatchFeatureView(FeatureView): udf: Optional[Callable[[Any], Any]] udf_string: Optional[str] feature_transformation: Optional[Transformation] - batch_engine: Optional[Field] + batch_engine: Optional[Dict[str, Any]] aggregations: Optional[List[Aggregation]] def __init__( @@ -88,7 +96,7 @@ def __init__( udf: Optional[Callable[[Any], Any]] = None, udf_string: Optional[str] = "", feature_transformation: Optional[Transformation] = None, - batch_engine: Optional[Field] = None, + batch_engine: Optional[Dict[str, Any]] = None, aggregations: Optional[List[Aggregation]] = None, ): if not flags_helper.is_test(): @@ -162,21 +170,9 @@ def batch_feature_view( schema: Optional[List[Field]] = None, ): """ - Args: - name: - mode: - entities: - ttl: - source: - tags: - online: - offline: - description: - owner: - schema: - - Returns: - + Creates a BatchFeatureView object with the given user-defined function (UDF) as the transformation. + Please make sure that the udf contains all non-built in imports within the function to ensure that the execution + of a deserialized function does not miss imports. """ def mainify(obj): diff --git a/sdk/python/feast/infra/compute_engines/base.py b/sdk/python/feast/infra/compute_engines/base.py index e50494abd63..360cf95a044 100644 --- a/sdk/python/feast/infra/compute_engines/base.py +++ b/sdk/python/feast/infra/compute_engines/base.py @@ -131,3 +131,34 @@ def get_execution_context( entity_defs=entity_defs, entity_df=entity_df, ) + + def _get_feature_view_engine_config( + self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView] + ) -> dict: + """ + Merge repo-level default batch engine config with runtime engine overrides defined in the feature view. + + Priority: + 1. Repo config (`self.repo_config.batch_engine_config`) - baseline + 2. FeatureView overrides (`batch_engine` for BatchFeatureView, `stream_engine` for StreamFeatureView`) - highest priority + + Args: + feature_view: A BatchFeatureView or StreamFeatureView. + + Returns: + dict: The merged engine configuration. + """ + default_conf = self.repo_config.batch_engine_config or {} + + runtime_conf = None + if isinstance(feature_view, BatchFeatureView): + runtime_conf = feature_view.batch_engine + elif isinstance(feature_view, StreamFeatureView): + runtime_conf = feature_view.stream_engine + + if runtime_conf is not None and not isinstance(runtime_conf, dict): + raise TypeError( + f"Engine config for {feature_view.name} must be a dict, got {type(runtime_conf)}." + ) + + return {**default_conf, **runtime_conf} if runtime_conf else dict(default_conf) diff --git a/sdk/python/feast/infra/compute_engines/local/README.md b/sdk/python/feast/infra/compute_engines/local/README.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/compute_engines/spark/compute.py b/sdk/python/feast/infra/compute_engines/spark/compute.py index 59a271a926e..e6ec58dd74d 100644 --- a/sdk/python/feast/infra/compute_engines/spark/compute.py +++ b/sdk/python/feast/infra/compute_engines/spark/compute.py @@ -3,6 +3,7 @@ from typing import Dict, Literal, Optional, Sequence, Union, cast from pydantic import StrictStr +from pyspark.sql import SparkSession from feast import ( BatchFeatureView, @@ -77,20 +78,11 @@ def teardown_infra( ): pass - def __init__( - self, - offline_store, - online_store, - repo_config, - **kwargs, - ): - super().__init__( - offline_store=offline_store, - online_store=online_store, - repo_config=repo_config, - **kwargs, - ) - self.spark_session = get_or_create_new_spark_session() + def _get_feature_view_spark_session( + self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView] + ) -> SparkSession: + spark_conf = self._get_feature_view_engine_config(feature_view) + return get_or_create_new_spark_session(spark_conf) def _materialize_one( self, @@ -113,11 +105,13 @@ def _materialize_one( # ✅ 1. Build typed execution context context = self.get_execution_context(registry, task) + spark_session = self._get_feature_view_spark_session(task.feature_view) + try: # ✅ 2. Construct Feature Builder and run it builder = SparkFeatureBuilder( registry=registry, - spark_session=self.spark_session, + spark_session=spark_session, task=task, ) plan = builder.build() @@ -209,18 +203,20 @@ def get_historical_features( # ✅ 1. Build typed execution context context = self.get_execution_context(registry, task) + spark_session = self._get_feature_view_spark_session(task.feature_view) + try: # ✅ 2. Construct Feature Builder and run it builder = SparkFeatureBuilder( registry=registry, - spark_session=self.spark_session, + spark_session=spark_session, task=task, ) plan = builder.build() return SparkDAGRetrievalJob( plan=plan, - spark_session=self.spark_session, + spark_session=spark_session, context=context, config=self.repo_config, full_feature_names=task.full_feature_name, @@ -229,7 +225,7 @@ def get_historical_features( # 🛑 Handle failure return SparkDAGRetrievalJob( plan=None, - spark_session=self.spark_session, + spark_session=spark_session, context=context, config=self.repo_config, full_feature_names=task.full_feature_name, diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index b845dfab119..9eed26a5e5e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -47,7 +47,6 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel): spark_conf: Optional[Dict[str, str]] = None """ Configuration overlay for the spark session """ - # sparksession is not serializable and we dont want to pass it around as an argument staging_location: Optional[StrictStr] = None """ Remote path for batch materialization jobs""" diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index dcbbd33df7c..7225be4b7ec 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -3,7 +3,7 @@ import warnings from datetime import datetime, timedelta from types import FunctionType -from typing import Dict, List, Optional, Tuple, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type, Union import dill from google.protobuf.message import Message @@ -47,6 +47,7 @@ class StreamFeatureView(FeatureView): Attributes: name: The unique name of the stream feature view. + mode: The transformation mode to use for the stream feature view. This can be one of TransformationMode. entities: List of entities or entity join keys. ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 @@ -63,6 +64,11 @@ class StreamFeatureView(FeatureView): tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the stream feature view, typically the email of the primary maintainer. udf: The user defined transformation function. This transformation function should have all of the corresponding imports imported within the function. + udf_string: The string representation of the user defined transformation function. + feature_transformation: The transformation to apply to the features. + Note, feature_transformation has precedence over udf and udf_string. + stream_engine: Optional dictionary containing stream engine specific configurations. + Note, it will override the repo-level default stream engine config defined in the yaml file. """ name: str @@ -85,7 +91,7 @@ class StreamFeatureView(FeatureView): udf: Optional[FunctionType] udf_string: Optional[str] feature_transformation: Optional[Transformation] - stream_engine: Optional[Field] + stream_engine: Optional[Dict[str, Any]] = None def __init__( self, @@ -107,7 +113,7 @@ def __init__( udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", feature_transformation: Optional[Transformation] = None, - stream_engine: Optional[Field] = None, + stream_engine: Optional[Dict[str, Any]] = None, ): if not flags_helper.is_test(): warnings.warn(