diff --git a/docs/reference/beta-on-demand-feature-view.md b/docs/reference/beta-on-demand-feature-view.md index f45620a1cee..2482bbc1c8f 100644 --- a/docs/reference/beta-on-demand-feature-view.md +++ b/docs/reference/beta-on-demand-feature-view.md @@ -35,10 +35,40 @@ When defining an ODFV, you can specify the transformation mode using the `mode` ### Singleton Transformations in Native Python Mode -Native Python mode supports transformations on singleton dictionaries by setting `singleton=True`. This allows you to -write transformation functions that operate on a single row at a time, making the code more intuitive and aligning with +Native Python mode supports transformations on singleton dictionaries by setting `singleton=True`. This allows you to +write transformation functions that operate on a single row at a time, making the code more intuitive and aligning with how data scientists typically think about data transformations. +## Aggregations + +On Demand Feature Views support aggregations that compute aggregate statistics over groups of rows. When using aggregations, data is grouped by entity columns (e.g., `driver_id`) and aggregated before being passed to the transformation function. + +**Important**: Aggregations and transformations are mutually exclusive. When aggregations are specified, they replace the transformation function. + +### Usage + +```python +from feast import Aggregation +from datetime import timedelta + +@on_demand_feature_view( + sources=[driver_hourly_stats_view], + schema=[ + Field(name="total_trips", dtype=Int64), + Field(name="avg_rating", dtype=Float64), + ], + aggregations=[ + Aggregation(column="trips", function="sum"), + Aggregation(column="rating", function="mean"), + ], +) +def driver_aggregated_stats(inputs): + # No transformation function needed when using aggregations + pass +``` + +Aggregated columns are automatically named using the pattern `{function}_{column}` (e.g., `sum_trips`, `mean_rating`). + ## Example See [https://github.com/feast-dev/on-demand-feature-views-demo](https://github.com/feast-dev/on-demand-feature-views-demo) for an example on how to use on demand feature views. diff --git a/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst b/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst index 39205f3c4df..69eeed70bb5 100644 --- a/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst +++ b/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst @@ -7,7 +7,7 @@ Submodules feast.infra.compute\_engines.local.backends.base module ------------------------------------------------------- -.. automodule:: feast.infra.compute_engines.local.backends.base +.. automodule:: feast.infra.compute_engines.backends.base :members: :undoc-members: :show-inheritance: @@ -15,7 +15,7 @@ feast.infra.compute\_engines.local.backends.base module feast.infra.compute\_engines.local.backends.factory module ---------------------------------------------------------- -.. automodule:: feast.infra.compute_engines.local.backends.factory +.. automodule:: feast.infra.compute_engines.backends.factory :members: :undoc-members: :show-inheritance: @@ -23,7 +23,7 @@ feast.infra.compute\_engines.local.backends.factory module feast.infra.compute\_engines.local.backends.pandas\_backend module ------------------------------------------------------------------ -.. automodule:: feast.infra.compute_engines.local.backends.pandas_backend +.. automodule:: feast.infra.compute_engines.backends.pandas_backend :members: :undoc-members: :show-inheritance: @@ -31,7 +31,7 @@ feast.infra.compute\_engines.local.backends.pandas\_backend module feast.infra.compute\_engines.local.backends.polars\_backend module ------------------------------------------------------------------ -.. automodule:: feast.infra.compute_engines.local.backends.polars_backend +.. automodule:: feast.infra.compute_engines.backends.polars_backend :members: :undoc-members: :show-inheritance: @@ -39,7 +39,7 @@ feast.infra.compute\_engines.local.backends.polars\_backend module Module contents --------------- -.. automodule:: feast.infra.compute_engines.local.backends +.. automodule:: feast.infra.compute_engines.backends :members: :undoc-members: :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.compute_engines.local.rst b/sdk/python/docs/source/feast.infra.compute_engines.local.rst index 6525199e6dc..d6ce47cb140 100644 --- a/sdk/python/docs/source/feast.infra.compute_engines.local.rst +++ b/sdk/python/docs/source/feast.infra.compute_engines.local.rst @@ -7,7 +7,7 @@ Subpackages .. toctree:: :maxdepth: 4 - feast.infra.compute_engines.local.backends + feast.infra.compute_engines.backends Submodules ---------- diff --git a/sdk/python/feast/infra/compute_engines/local/backends/__init__.py b/sdk/python/feast/infra/compute_engines/backends/__init__.py similarity index 100% rename from sdk/python/feast/infra/compute_engines/local/backends/__init__.py rename to sdk/python/feast/infra/compute_engines/backends/__init__.py diff --git a/sdk/python/feast/infra/compute_engines/local/backends/base.py b/sdk/python/feast/infra/compute_engines/backends/base.py similarity index 100% rename from sdk/python/feast/infra/compute_engines/local/backends/base.py rename to sdk/python/feast/infra/compute_engines/backends/base.py diff --git a/sdk/python/feast/infra/compute_engines/local/backends/factory.py b/sdk/python/feast/infra/compute_engines/backends/factory.py similarity index 84% rename from sdk/python/feast/infra/compute_engines/local/backends/factory.py rename to sdk/python/feast/infra/compute_engines/backends/factory.py index 6d3774f6393..ffe969f3003 100644 --- a/sdk/python/feast/infra/compute_engines/local/backends/factory.py +++ b/sdk/python/feast/infra/compute_engines/backends/factory.py @@ -3,8 +3,8 @@ import pandas as pd import pyarrow -from feast.infra.compute_engines.local.backends.base import DataFrameBackend -from feast.infra.compute_engines.local.backends.pandas_backend import PandasBackend +from feast.infra.compute_engines.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend class BackendFactory: @@ -46,7 +46,7 @@ def _is_polars(entity_df) -> bool: @staticmethod def _get_polars_backend(): - from feast.infra.compute_engines.local.backends.polars_backend import ( + from feast.infra.compute_engines.backends.polars_backend import ( PolarsBackend, ) diff --git a/sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py b/sdk/python/feast/infra/compute_engines/backends/pandas_backend.py similarity index 93% rename from sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py rename to sdk/python/feast/infra/compute_engines/backends/pandas_backend.py index 76ddd688424..8ea5a4a9213 100644 --- a/sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py +++ b/sdk/python/feast/infra/compute_engines/backends/pandas_backend.py @@ -3,7 +3,7 @@ import pandas as pd import pyarrow as pa -from feast.infra.compute_engines.local.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.base import DataFrameBackend class PandasBackend(DataFrameBackend): diff --git a/sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py b/sdk/python/feast/infra/compute_engines/backends/polars_backend.py similarity index 94% rename from sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py rename to sdk/python/feast/infra/compute_engines/backends/polars_backend.py index 352ffecdab8..92e348c3652 100644 --- a/sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py +++ b/sdk/python/feast/infra/compute_engines/backends/polars_backend.py @@ -3,7 +3,7 @@ import polars as pl import pyarrow as pa -from feast.infra.compute_engines.local.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.base import DataFrameBackend class PolarsBackend(DataFrameBackend): diff --git a/sdk/python/feast/infra/compute_engines/local/compute.py b/sdk/python/feast/infra/compute_engines/local/compute.py index 556468f5e1d..26f537da7cf 100644 --- a/sdk/python/feast/infra/compute_engines/local/compute.py +++ b/sdk/python/feast/infra/compute_engines/local/compute.py @@ -12,10 +12,10 @@ MaterializationTask, ) from feast.infra.common.retrieval_task import HistoricalRetrievalTask +from feast.infra.compute_engines.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.factory import BackendFactory from feast.infra.compute_engines.base import ComputeEngine from feast.infra.compute_engines.dag.context import ExecutionContext -from feast.infra.compute_engines.local.backends.base import DataFrameBackend -from feast.infra.compute_engines.local.backends.factory import BackendFactory from feast.infra.compute_engines.local.feature_builder import LocalFeatureBuilder from feast.infra.compute_engines.local.job import ( LocalMaterializationJob, diff --git a/sdk/python/feast/infra/compute_engines/local/feature_builder.py b/sdk/python/feast/infra/compute_engines/local/feature_builder.py index 9b2306c0f01..a98573621fb 100644 --- a/sdk/python/feast/infra/compute_engines/local/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/local/feature_builder.py @@ -2,8 +2,8 @@ from feast.infra.common.materialization_job import MaterializationTask from feast.infra.common.retrieval_task import HistoricalRetrievalTask +from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.feature_builder import FeatureBuilder -from feast.infra.compute_engines.local.backends.base import DataFrameBackend from feast.infra.compute_engines.local.nodes import ( LocalAggregationNode, LocalDedupNode, diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index 870a098261d..985a089daae 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -5,11 +5,11 @@ from feast import BatchFeatureView, StreamFeatureView from feast.data_source import DataSource +from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext from feast.infra.compute_engines.dag.model import DAGFormat from feast.infra.compute_engines.dag.node import DAGNode from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue -from feast.infra.compute_engines.local.backends.base import DataFrameBackend from feast.infra.compute_engines.local.local_node import LocalNode from feast.infra.compute_engines.utils import ( create_offline_store_retrieval_job, diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 939adbe933f..01ffa774ccd 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -33,6 +33,7 @@ RequestDataNotFoundInEntityRowsException, ) from feast.field import Field +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.key_encoding_utils import deserialize_entity_key from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, @@ -561,6 +562,76 @@ def construct_response_feature_vector( ) +def _get_aggregate_operations(agg_specs) -> dict: + """ + Convert Aggregation specs to agg_ops format for PandasBackend. + + Reused from LocalFeatureBuilder logic. + TODO: This logic is duplicated from feast.infra.compute_engines.local.feature_builder.LocalFeatureBuilder._get_aggregate_operations(). + Consider refactoring to a shared utility module in the future. + """ + agg_ops = {} + for agg in agg_specs: + if agg.time_window is not None: + raise ValueError( + "Time window aggregation is not supported in online serving." + ) + alias = f"{agg.function}_{agg.column}" + agg_ops[alias] = (agg.function, agg.column) + return agg_ops + + +def _apply_aggregations_to_response( + response_data: Union[pyarrow.Table, Dict[str, List[Any]]], + aggregations, + group_keys: Optional[List[str]], + mode: str, +) -> Union[pyarrow.Table, Dict[str, List[Any]]]: + """ + Apply aggregations using PandasBackend. + + Args: + response_data: Either a pyarrow.Table or dict of lists containing the data + aggregations: List of Aggregation objects to apply + group_keys: List of column names to group by (optional) + mode: Transformation mode ("python", "pandas", or "substrait") + + Returns: + Aggregated data in the same format as input + + TODO: Consider refactoring to support backends other than pandas in the future. + """ + if not aggregations: + return response_data + + backend = PandasBackend() + + # Convert to pandas DataFrame + if isinstance(response_data, dict): + df = pd.DataFrame(response_data) + else: # pyarrow.Table + df = backend.from_arrow(response_data) + + if df.empty: + return response_data + + # Convert aggregations to agg_ops format + agg_ops = _get_aggregate_operations(aggregations) + + # Apply aggregations using PandasBackend + if group_keys: + result_df = backend.groupby_agg(df, group_keys, agg_ops) + else: + # No grouping - aggregate over entire dataset + result_df = backend.groupby_agg(df, [], agg_ops) + + # Convert back to original format + if mode == "python": + return {col: result_df[col].tolist() for col in result_df.columns} + else: # pandas or substrait + return backend.to_arrow(result_df) + + def _augment_response_with_on_demand_transforms( online_features_response: GetOnlineFeaturesResponse, feature_refs: List[str], @@ -605,7 +676,31 @@ def _augment_response_with_on_demand_transforms( for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] if not odfv.write_to_online_store: - if odfv.mode == "python": + # Apply aggregations if configured. + if odfv.aggregations: + if odfv.mode == "python": + if initial_response_dict is None: + initial_response_dict = initial_response.to_dict() + initial_response_dict = _apply_aggregations_to_response( + initial_response_dict, + odfv.aggregations, + odfv.entities, + odfv.mode, + ) + elif odfv.mode in {"pandas", "substrait"}: + if initial_response_arrow is None: + initial_response_arrow = initial_response.to_arrow() + initial_response_arrow = _apply_aggregations_to_response( + initial_response_arrow, + odfv.aggregations, + odfv.entities, + odfv.mode, + ) + + # Apply transformation. Note: aggregations and transformation configs are mutually exclusive + # TODO: Fix to make it work for having both aggregation and transformation + # ticket: https://github.com/feast-dev/feast/issues/5689 + elif odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( diff --git a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py index 20e23c35e03..905ea65ae42 100644 --- a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py +++ b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py @@ -4,9 +4,9 @@ import pandas as pd import pyarrow as pa +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue -from feast.infra.compute_engines.local.backends.pandas_backend import PandasBackend from feast.infra.compute_engines.local.nodes import ( LocalAggregationNode, LocalDedupNode, diff --git a/sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py b/sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py new file mode 100644 index 00000000000..3d6199be3a0 --- /dev/null +++ b/sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py @@ -0,0 +1,89 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for OnDemandFeatureView aggregations in online serving.""" + +import pyarrow as pa + +from feast.aggregation import Aggregation +from feast.utils import _apply_aggregations_to_response + + +def test_aggregation_python_mode(): + """Test aggregations in Python mode (dict format).""" + data = { + "driver_id": [1, 1, 2, 2], + "trips": [10, 20, 15, 25], + } + aggs = [Aggregation(column="trips", function="sum")] + + result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python") + + assert result == {"driver_id": [1, 2], "sum_trips": [30, 40]} + + +def test_aggregation_pandas_mode(): + """Test aggregations in Pandas mode (Arrow table format).""" + table = pa.table( + { + "driver_id": [1, 1, 2, 2], + "trips": [10, 20, 15, 25], + } + ) + aggs = [Aggregation(column="trips", function="sum")] + + result = _apply_aggregations_to_response(table, aggs, ["driver_id"], "pandas") + + assert isinstance(result, pa.Table) + result_df = result.to_pandas() + assert list(result_df["driver_id"]) == [1, 2] + assert list(result_df["sum_trips"]) == [30, 40] + + +def test_multiple_aggregations(): + """Test multiple aggregation functions.""" + data = { + "driver_id": [1, 1, 2, 2], + "trips": [10, 20, 15, 25], + "revenue": [100.0, 200.0, 150.0, 250.0], + } + aggs = [ + Aggregation(column="trips", function="sum"), + Aggregation(column="revenue", function="mean"), + ] + + result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python") + + assert result["driver_id"] == [1, 2] + assert result["sum_trips"] == [30, 40] + assert result["mean_revenue"] == [150.0, 200.0] + + +def test_no_aggregations_returns_original(): + """Test that no aggregations returns original data.""" + data = {"driver_id": [1, 2], "trips": [10, 20]} + + result = _apply_aggregations_to_response(data, [], ["driver_id"], "python") + + assert result == data + + +def test_empty_data_returns_empty(): + """Test that empty data returns empty result.""" + data = {"driver_id": [], "trips": []} + aggs = [Aggregation(column="trips", function="sum")] + + result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python") + + assert result == data