diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index af7a5e68fd6..c66af0db18e 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -1,6 +1,10 @@ +import functools import warnings from datetime import datetime, timedelta -from typing import Dict, List, Optional, Tuple +from types import FunctionType +from typing import Dict, List, Optional, Tuple, Union + +import dill from feast import flags_helper from feast.data_source import DataSource @@ -8,6 +12,8 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode warnings.simplefilter("once", RuntimeWarning) @@ -42,6 +48,7 @@ class BatchFeatureView(FeatureView): """ name: str + mode: Union[TransformationMode, str] entities: List[str] ttl: Optional[timedelta] source: DataSource @@ -54,11 +61,15 @@ class BatchFeatureView(FeatureView): owner: str timestamp_field: str materialization_intervals: List[Tuple[datetime, datetime]] + udf: Optional[FunctionType] + udf_string: Optional[str] + feature_transformation: Transformation def __init__( self, *, name: str, + mode: Union[TransformationMode, str] = TransformationMode.PYTHON, source: DataSource, entities: Optional[List[Entity]] = None, ttl: Optional[timedelta] = None, @@ -67,6 +78,9 @@ def __init__( description: str = "", owner: str = "", schema: Optional[List[Field]] = None, + udf: Optional[FunctionType] = None, + udf_string: Optional[str] = "", + feature_transformation: Optional[Transformation] = None, ): if not flags_helper.is_test(): warnings.warn( @@ -84,6 +98,13 @@ def __init__( f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " ) + self.mode = mode + self.udf = udf + self.udf_string = udf_string + self.feature_transformation = ( + feature_transformation or self.get_feature_transformation() + ) + super().__init__( name=name, entities=entities, @@ -95,3 +116,79 @@ def __init__( schema=schema, source=source, ) + + def get_feature_transformation(self) -> Transformation: + if not self.udf: + raise ValueError( + "Either a UDF or a feature transformation must be provided for BatchFeatureView" + ) + if self.mode in ( + TransformationMode.PANDAS, + TransformationMode.PYTHON, + TransformationMode.SQL, + ) or self.mode in ("pandas", "python", "sql"): + return Transformation( + mode=self.mode, udf=self.udf, udf_string=self.udf_string or "" + ) + else: + raise ValueError( + f"Unsupported transformation mode: {self.mode} for StreamFeatureView" + ) + + +def batch_feature_view( + *, + name: Optional[str] = None, + mode: Union[TransformationMode, str] = TransformationMode.PYTHON, + entities: Optional[List[str]] = None, + ttl: Optional[timedelta] = None, + source: Optional[DataSource] = None, + tags: Optional[Dict[str, str]] = None, + online: bool = True, + description: str = "", + owner: str = "", + schema: Optional[List[Field]] = None, +): + """ + Args: + name: + entities: + ttl: + source: + tags: + online: + description: + owner: + schema: + + Returns: + + """ + + def mainify(obj): + # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same + # name as the original file defining the sfv. + if obj.__module__ != "__main__": + obj.__module__ = "__main__" + + def decorator(user_function): + udf_string = dill.source.getsource(user_function) + mainify(user_function) + batch_feature_view_obj = BatchFeatureView( + name=name or user_function.__name__, + mode=mode, + entities=entities, + ttl=ttl, + source=source, + tags=tags, + online=online, + description=description, + owner=owner, + schema=schema, + udf=user_function, + udf_string=udf_string, + ) + functools.update_wrapper(wrapper=batch_feature_view_obj, wrapped=user_function) + return batch_feature_view_obj + + return decorator diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 04b03c96823..5154ca19715 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -1,12 +1,10 @@ import copy import functools -import inspect import warnings from types import FunctionType -from typing import Any, List, Optional, Union, get_type_hints +from typing import Any, List, Optional, Union, cast import dill -import pandas as pd import pyarrow from typeguard import typechecked @@ -31,6 +29,8 @@ from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunctionV2 as UserDefinedFunctionProto, ) +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode from feast.transformation.pandas_transformation import PandasTransformation from feast.transformation.python_transformation import PythonTransformation from feast.transformation.substrait_transformation import SubstraitTransformation @@ -66,15 +66,15 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: dict[str, FeatureViewProjection] source_request_sources: dict[str, RequestSource] - feature_transformation: Union[ - PandasTransformation, PythonTransformation, SubstraitTransformation - ] + feature_transformation: Transformation mode: str description: str tags: dict[str, str] owner: str write_to_online_store: bool singleton: bool + udf: Optional[FunctionType] + udf_string: Optional[str] def __init__( # noqa: C901 self, @@ -90,10 +90,8 @@ def __init__( # noqa: C901 ] ], udf: Optional[FunctionType] = None, - udf_string: str = "", - feature_transformation: Union[ - PandasTransformation, PythonTransformation, SubstraitTransformation - ], + udf_string: Optional[str] = "", + feature_transformation: Optional[Transformation] = None, mode: str = "pandas", description: str = "", tags: Optional[dict[str, str]] = None, @@ -112,9 +110,9 @@ def __init__( # noqa: C901 sources: A map from input source names to the actual input sources, which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. - udf (deprecated): The user defined transformation function, which must take pandas + udf: The user defined transformation function, which must take pandas dataframes as inputs. - udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) + udf_string: The source code version of the udf (for diffing and displaying in Web UI) feature_transformation: The user defined transformation. mode: Mode of execution (e.g., Pandas or Python native) description (optional): A human-readable description. @@ -136,29 +134,10 @@ def __init__( # noqa: C901 schema = schema or [] self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] + self.sources = sources self.mode = mode.lower() - - if self.mode not in {"python", "pandas", "substrait"}: - raise ValueError( - f"Unknown mode {self.mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait." - ) - - if not feature_transformation: - if udf: - warnings.warn( - "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.", - DeprecationWarning, - ) - # Note inspecting the return signature won't work with isinstance so this is the best alternative - if self.mode == "pandas": - feature_transformation = PandasTransformation(udf, udf_string) - elif self.mode == "python": - feature_transformation = PythonTransformation(udf, udf_string) - else: - raise ValueError( - "OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments" - ) - + self.udf = udf + self.udf_string = udf_string self.source_feature_view_projections: dict[str, FeatureViewProjection] = {} self.source_request_sources: dict[str, RequestSource] = {} for odfv_source in sources: @@ -206,12 +185,33 @@ def __init__( # noqa: C901 features.append(field) self.features = features - self.feature_transformation = feature_transformation + self.feature_transformation = ( + feature_transformation or self.get_feature_transformation() + ) self.write_to_online_store = write_to_online_store self.singleton = singleton if self.singleton and self.mode != "python": raise ValueError("Singleton is only supported for Python mode.") + def get_feature_transformation(self) -> Transformation: + if not self.udf: + raise ValueError( + "Either udf or feature_transformation must be provided to create an OnDemandFeatureView" + ) + if self.mode in ( + TransformationMode.PANDAS, + TransformationMode.PYTHON, + ) or self.mode in ("pandas", "python"): + return Transformation( + mode=self.mode, udf=self.udf, udf_string=self.udf_string or "" + ) + elif self.mode == TransformationMode.SUBSTRAIT or self.mode == "substrait": + return SubstraitTransformation.from_ibis(self.udf, self.sources) + else: + raise ValueError( + f"Unsupported transformation mode: {self.mode} for OnDemandFeatureView" + ) + @property def proto_class(self) -> type[OnDemandFeatureViewProto]: return OnDemandFeatureViewProto @@ -312,16 +312,25 @@ def to_proto(self) -> OnDemandFeatureViewProto: request_data_source=request_sources.to_proto() ) - feature_transformation = FeatureTransformationProto( - user_defined_function=self.feature_transformation.to_proto() + user_defined_function_proto = cast( + UserDefinedFunctionProto, + self.feature_transformation.to_proto() if isinstance( self.feature_transformation, (PandasTransformation, PythonTransformation), ) else None, - substrait_transformation=self.feature_transformation.to_proto() + ) + + substrait_transformation_proto = ( + self.feature_transformation.to_proto() if isinstance(self.feature_transformation, SubstraitTransformation) - else None, + else None + ) + + feature_transformation = FeatureTransformationProto( + user_defined_function=user_defined_function_proto, + substrait_transformation=substrait_transformation_proto, ) spec = OnDemandFeatureViewSpec( name=self.name, @@ -786,31 +795,13 @@ def mainify(obj) -> None: obj.__module__ = "__main__" def decorator(user_function): - return_annotation = get_type_hints(user_function).get("return", inspect._empty) udf_string = dill.source.getsource(user_function) mainify(user_function) - if mode == "pandas": - if return_annotation not in (inspect._empty, pd.DataFrame): - raise TypeError( - f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame" - ) - transformation = PandasTransformation(user_function, udf_string) - elif mode == "python": - transformation = PythonTransformation(user_function, udf_string) - elif mode == "substrait": - from ibis.expr.types.relations import Table - - if return_annotation not in (inspect._empty, Table): - raise TypeError( - f"return signature for {user_function} is {return_annotation} but should be ibis.expr.types.relations.Table" - ) - transformation = SubstraitTransformation.from_ibis(user_function, sources) on_demand_feature_view_obj = OnDemandFeatureView( name=name if name is not None else user_function.__name__, sources=sources, schema=schema, - feature_transformation=transformation, mode=mode, description=description, tags=tags, @@ -818,6 +809,8 @@ def decorator(user_function): write_to_online_store=write_to_online_store, entities=entities, singleton=singleton, + udf=user_function, + udf_string=udf_string, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 50e1a221456..42802993226 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -31,7 +31,8 @@ from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunctionV2 as UserDefinedFunctionProtoV2, ) -from feast.transformation.pandas_transformation import PandasTransformation +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode warnings.simplefilter("once", RuntimeWarning) @@ -75,12 +76,12 @@ class StreamFeatureView(FeatureView): tags: Dict[str, str] owner: str aggregations: List[Aggregation] - mode: str + mode: Union[TransformationMode, str] timestamp_field: str materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] udf_string: Optional[str] - feature_transformation: Optional[PandasTransformation] + feature_transformation: Optional[Transformation] def __init__( self, @@ -95,11 +96,11 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, aggregations: Optional[List[Aggregation]] = None, - mode: Optional[str] = "spark", + mode: Union[str, TransformationMode] = TransformationMode.PYTHON, timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", - feature_transformation: Optional[Union[PandasTransformation]] = None, + feature_transformation: Optional[Transformation] = None, ): if not flags_helper.is_test(): warnings.warn( @@ -123,11 +124,13 @@ def __init__( ) self.aggregations = aggregations or [] - self.mode = mode or "" + self.mode = mode self.timestamp_field = timestamp_field or "" self.udf = udf self.udf_string = udf_string - self.feature_transformation = feature_transformation + self.feature_transformation = ( + feature_transformation or self.get_feature_transformation() + ) super().__init__( name=name, @@ -141,6 +144,23 @@ def __init__( source=source, ) + def get_feature_transformation(self) -> Optional[Transformation]: + if not self.udf: + # TODO: Currently StreamFeatureView allow no transformation, but this should be removed in the future + return None + if self.mode in ( + TransformationMode.PANDAS, + TransformationMode.PYTHON, + TransformationMode.SPARK, + ) or self.mode in ("pandas", "python", "spark"): + return Transformation( + mode=self.mode, udf=self.udf, udf_string=self.udf_string or "" + ) + else: + raise ValueError( + f"Unsupported transformation mode: {self.mode} for StreamFeatureView" + ) + def __eq__(self, other): if not isinstance(other, StreamFeatureView): raise TypeError("Comparisons should only involve StreamFeatureViews") @@ -198,6 +218,10 @@ def to_proto(self): user_defined_function=udf_proto_v2, ) + mode = ( + self.mode.value if isinstance(self.mode, TransformationMode) else self.mode + ) + spec = StreamFeatureViewSpecProto( name=self.name, entities=self.entities, @@ -214,7 +238,7 @@ def to_proto(self): stream_source=stream_source_proto or None, timestamp_field=self.timestamp_field, aggregations=[agg.to_proto() for agg in self.aggregations], - mode=self.mode, + mode=mode, ) return StreamFeatureViewProto(spec=spec, meta=meta) @@ -264,9 +288,6 @@ def from_proto(cls, sfv_proto): mode=sfv_proto.spec.mode, udf=udf, udf_string=udf_string, - feature_transformation=PandasTransformation(udf, udf_string) - if udf - else None, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -323,6 +344,7 @@ def __copy__(self): timestamp_field=self.timestamp_field, source=self.stream_source if self.stream_source else self.batch_source, udf=self.udf, + udf_string=self.udf_string, feature_transformation=self.feature_transformation, ) fv.entities = self.entities @@ -373,7 +395,6 @@ def decorator(user_function): schema=schema, udf=user_function, udf_string=udf_string, - feature_transformation=PandasTransformation(user_function, udf_string), description=description, tags=tags, online=online, diff --git a/sdk/python/feast/transformation/base.py b/sdk/python/feast/transformation/base.py new file mode 100644 index 00000000000..7489e16be97 --- /dev/null +++ b/sdk/python/feast/transformation/base.py @@ -0,0 +1,143 @@ +import functools +from abc import ABC +from typing import Any, Callable, Dict, Optional, Union + +import dill + +from feast.protos.feast.core.Transformation_pb2 import ( + SubstraitTransformationV2 as SubstraitTransformationProto, +) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) +from feast.transformation.factory import ( + TRANSFORMATION_CLASS_FOR_TYPE, + get_transformation_class_from_type, +) +from feast.transformation.mode import TransformationMode + + +class Transformation(ABC): + """ + Base Transformation class. Can be used to define transformations that can be applied to FeatureViews. + Also encapsulates the logic to serialize and deserialize the transformation to and from proto. This is + important for the future transformation lifecycle management. + E.g.: + pandas_transformation = Transformation( + mode=TransformationMode.PANDAS, + udf=lambda df: df.assign(new_column=df['column1'] + df['column2']), + ) + """ + + udf: Callable[[Any], Any] + udf_string: str + + def __new__( + cls, + mode: Union[TransformationMode, str], + udf: Callable[[Any], Any], + udf_string: str, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", + *args, + **kwargs, + ) -> "Transformation": + """ + Creates a Transformation object. + Args: + mode: (required) The mode of the transformation. Choose one from TransformationMode. + udf: (required) The user-defined transformation function. + udf_string: (required) The string representation of the udf. The dill get source doesn't + work for all cases when extracting the source code from the udf. So it's better to pass + the source code as a string. + name: (optional) The name of the transformation. + tags: (optional) Metadata tags for the transformation. + description: (optional) A description of the transformation. + owner: (optional) The owner of the transformation. + """ + if cls is Transformation: + if isinstance(mode, TransformationMode): + mode = mode.value + + if mode.lower() in TRANSFORMATION_CLASS_FOR_TYPE: + subclass = get_transformation_class_from_type(mode.lower()) + return super().__new__(subclass) + + raise ValueError( + f"Invalid mode: {mode}. Choose one from TransformationMode." + ) + + return super().__new__(cls) + + def __init__( + self, + mode: Union[TransformationMode, str], + udf: Callable[[Any], Any], + udf_string: str, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", + ): + self.mode = mode if isinstance(mode, str) else mode.value + self.udf = udf + self.udf_string = udf_string + self.name = name + self.tags = tags or {} + self.description = description + self.owner = owner + + def to_proto(self) -> Union[UserDefinedFunctionProto, SubstraitTransformationProto]: + return UserDefinedFunctionProto( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + def __deepcopy__(self, memo: Optional[Dict[int, Any]] = None) -> "Transformation": + return Transformation(mode=self.mode, udf=self.udf, udf_string=self.udf_string) + + def transform(self, inputs: Any) -> Any: + raise NotImplementedError + + def transform_arrow(self, *args, **kwargs) -> Any: + pass + + def transform_singleton(self, *args, **kwargs) -> Any: + pass + + def infer_features(self, *args, **kwargs) -> Any: + raise NotImplementedError + + +def transformation( + mode: Union[TransformationMode, str], + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: Optional[str] = "", + owner: Optional[str] = "", +): + def mainify(obj): + # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same + # name as the original file defining the sfv. + if obj.__module__ != "__main__": + obj.__module__ = "__main__" + + def decorator(user_function): + udf_string = dill.source.getsource(user_function) + mainify(user_function) + transformation_obj = Transformation( + mode=mode, + name=name or user_function.__name__, + tags=tags, + description=description, + owner=owner, + udf=user_function, + udf_string=udf_string, + ) + functools.update_wrapper(wrapper=transformation_obj, wrapped=user_function) + return transformation_obj + + return decorator diff --git a/sdk/python/feast/transformation/factory.py b/sdk/python/feast/transformation/factory.py new file mode 100644 index 00000000000..5097d71353a --- /dev/null +++ b/sdk/python/feast/transformation/factory.py @@ -0,0 +1,22 @@ +from feast.importer import import_class + +TRANSFORMATION_CLASS_FOR_TYPE = { + "python": "feast.transformation.python_transformation.PythonTransformation", + "pandas": "feast.transformation.pandas_transformation.PandasTransformation", + "substrait": "feast.transformation.substrait_transformation.SubstraitTransformation", + "sql": "feast.transformation.sql_transformation.SQLTransformation", + "spark": "feast.transformation.spark_transformation.SparkTransformation", +} + + +def get_transformation_class_from_type(transformation_type: str): + if transformation_type in TRANSFORMATION_CLASS_FOR_TYPE: + transformation_type = TRANSFORMATION_CLASS_FOR_TYPE[transformation_type] + elif not transformation_type.endswith("Transformation"): + raise ValueError( + f"Invalid transformation type: {transformation_type}. Choose from {list(TRANSFORMATION_CLASS_FOR_TYPE.keys())}." + ) + module_name, transformation_class_type = transformation_type.rsplit(".", 1) + return import_class( + module_name, transformation_class_type, transformation_class_type + ) diff --git a/sdk/python/feast/transformation/mode.py b/sdk/python/feast/transformation/mode.py new file mode 100644 index 00000000000..4bd5ddbe7a3 --- /dev/null +++ b/sdk/python/feast/transformation/mode.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class TransformationMode(Enum): + PYTHON = "python" + PANDAS = "pandas" + SPARK = "spark" + SQL = "sql" + SUBSTRAIT = "substrait" diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index 66a5c65caf2..469ddaa7768 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -1,4 +1,5 @@ -from typing import Any, Callable, Optional +import inspect +from typing import Any, Callable, Optional, cast, get_type_hints import dill import pandas as pd @@ -8,23 +9,61 @@ from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunctionV2 as UserDefinedFunctionProto, ) +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode from feast.type_map import ( python_type_to_feast_value_type, ) -class PandasTransformation: - def __init__(self, udf: Callable[[Any], Any], udf_string: str = ""): - """ - Creates an PandasTransformation object. +class PandasTransformation(Transformation): + def __new__( + cls, + udf: Callable[[Any], Any], + udf_string: str, + name: Optional[str] = None, + tags: Optional[dict[str, str]] = None, + description: str = "", + owner: str = "", + ) -> "PandasTransformation": + instance = super(PandasTransformation, cls).__new__( + cls, + mode=TransformationMode.PANDAS, + udf=udf, + name=name, + udf_string=udf_string, + tags=tags, + description=description, + owner=owner, + ) + return cast(PandasTransformation, instance) - Args: - udf: The user defined transformation function, which must take pandas - dataframes as inputs. - udf_string: The source code version of the udf (for diffing and displaying in Web UI) - """ - self.udf = udf - self.udf_string = udf_string + def __init__( + self, + udf: Callable[[Any], Any], + udf_string: str, + name: Optional[str] = None, + tags: Optional[dict[str, str]] = None, + description: str = "", + owner: str = "", + *args, + **kwargs, + ): + return_annotation = get_type_hints(udf).get("return", inspect._empty) + if return_annotation not in (inspect._empty, pd.DataFrame): + raise TypeError( + f"return signature for PandasTransformation should be pd.DataFrame, instead got {return_annotation}" + ) + + super().__init__( + mode=TransformationMode.PANDAS, + udf=udf, + name=name, + udf_string=udf_string, + tags=tags, + description=description, + owner=owner, + ) def transform_arrow( self, pa_table: pyarrow.Table, features: list[Field] @@ -32,16 +71,14 @@ def transform_arrow( output_df_pandas = self.udf(pa_table.to_pandas()) return pyarrow.Table.from_pandas(output_df_pandas) - def transform(self, input_df: pd.DataFrame) -> pd.DataFrame: - return self.udf(input_df) - - def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: - raise ValueError( - "PandasTransformation does not support singleton transformations." - ) + def transform(self, inputs: pd.DataFrame) -> pd.DataFrame: + return self.udf(inputs) def infer_features( - self, random_input: dict[str, list[Any]], singleton: Optional[bool] + self, + random_input: dict[str, list[Any]], + *args, + **kwargs, ) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) @@ -82,13 +119,6 @@ def __eq__(self, other): return True - def to_proto(self) -> UserDefinedFunctionProto: - return UserDefinedFunctionProto( - name=self.udf.__name__, - body=dill.dumps(self.udf, recurse=True), - body_text=self.udf_string, - ) - @classmethod def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): return PandasTransformation( diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 3ea62963ae5..2a19b811abe 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -1,5 +1,5 @@ from types import FunctionType -from typing import Any, Optional +from typing import Any, Dict, Optional, cast import dill import pyarrow @@ -8,22 +8,73 @@ from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunctionV2 as UserDefinedFunctionProto, ) +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode from feast.type_map import ( python_type_to_feast_value_type, ) -class PythonTransformation: - def __init__(self, udf: FunctionType, udf_string: str = ""): +class PythonTransformation(Transformation): + udf: FunctionType + + def __new__( + cls, + udf: FunctionType, + udf_string: str, + singleton: bool = False, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", + ) -> "PythonTransformation": + instance = super(PythonTransformation, cls).__new__( + cls, + mode=TransformationMode.PYTHON, + singleton=singleton, + udf=udf, + udf_string=udf_string, + name=name, + tags=tags, + description=description, + owner=owner, + ) + return cast(PythonTransformation, instance) + + def __init__( + self, + udf: FunctionType, + udf_string: str, + singleton: bool = False, + name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + description: str = "", + owner: str = "", + *args, + **kwargs, + ): """ - Creates an PythonTransformation object. + Creates a PythonTransformation object. + Args: - udf: The user defined transformation function, which must take pandas + udf: The user-defined transformation function, which must take pandas dataframes as inputs. - udf_string: The source code version of the udf (for diffing and displaying in Web UI) + name: The name of the transformation. + udf_string: The source code version of the UDF (for diffing and displaying in Web UI). + tags: Metadata tags for the transformation. + description: A description of the transformation. + owner: The owner of the transformation. """ - self.udf = udf - self.udf_string = udf_string + super().__init__( + mode=TransformationMode.PYTHON, + udf=udf, + name=name, + udf_string=udf_string, + tags=tags, + description=description, + owner=owner, + ) + self.singleton = singleton def transform_arrow( self, @@ -104,13 +155,6 @@ def __eq__(self, other): return True - def to_proto(self) -> UserDefinedFunctionProto: - return UserDefinedFunctionProto( - name=self.udf.__name__, - body=dill.dumps(self.udf, recurse=True), - body_text=self.udf_string, - ) - @classmethod def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): return PythonTransformation( diff --git a/sdk/python/feast/transformation/spark_transformation.py b/sdk/python/feast/transformation/spark_transformation.py new file mode 100644 index 00000000000..d288cf58b08 --- /dev/null +++ b/sdk/python/feast/transformation/spark_transformation.py @@ -0,0 +1,11 @@ +from typing import Any + +from feast.transformation.base import Transformation + + +class SparkTransformation(Transformation): + def transform(self, inputs: Any) -> Any: + pass + + def infer_features(self, *args, **kwargs) -> Any: + pass diff --git a/sdk/python/feast/transformation/sql_transformation.py b/sdk/python/feast/transformation/sql_transformation.py new file mode 100644 index 00000000000..62d6b40de0b --- /dev/null +++ b/sdk/python/feast/transformation/sql_transformation.py @@ -0,0 +1,8 @@ +from typing import Any + +from feast.transformation.base import Transformation + + +class SQLTransformation(Transformation): + def transform(self, inputs: Any) -> str: + return self.udf(inputs) diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index a6d9bfa18c0..476159cd003 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -1,5 +1,5 @@ -from types import FunctionType -from typing import Any, Optional +import inspect +from typing import Any, Callable, Dict, Optional, cast, get_type_hints import dill import pandas as pd @@ -11,23 +11,64 @@ from feast.protos.feast.core.Transformation_pb2 import ( SubstraitTransformationV2 as SubstraitTransformationProto, ) +from feast.transformation.base import Transformation +from feast.transformation.mode import TransformationMode from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, ) -class SubstraitTransformation: - def __init__(self, substrait_plan: bytes, ibis_function: FunctionType): +class SubstraitTransformation(Transformation): + def __new__( + cls, + substrait_plan: bytes, + udf: Callable[[Any], Any], + name: Optional[str] = None, + tags: Optional[dict[str, str]] = None, + description: str = "", + owner: str = "", + ) -> "SubstraitTransformation": + instance = super(SubstraitTransformation, cls).__new__( + cls, + mode=TransformationMode.SUBSTRAIT, + udf=udf, + name=name, + udf_string="", + tags=tags, + description=description, + owner=owner, + ) + return cast(SubstraitTransformation, instance) + + def __init__( + self, + substrait_plan: bytes, + udf: Callable[[Any], Any], + name: Optional[str] = None, + tags: Optional[dict[str, str]] = None, + description: str = "", + owner: str = "", + *args, + **kwargs, + ): """ Creates an SubstraitTransformation object. Args: substrait_plan: The user-provided substrait plan. - ibis_function: The user-provided ibis function. + udf: The user-provided ibis function. """ + super().__init__( + mode=TransformationMode.SUBSTRAIT, + udf=udf, + name=name, + udf_string="", + tags=tags, + description=description, + owner=owner, + ) self.substrait_plan = substrait_plan - self.ibis_function = ibis_function def transform(self, df: pd.DataFrame) -> pd.DataFrame: def table_provider(names, schema: pyarrow.Schema): @@ -44,7 +85,7 @@ def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: ) def transform_ibis(self, table): - return self.ibis_function(table) + return self.udf(table) def transform_arrow( self, pa_table: pyarrow.Table, features: list[Field] = [] @@ -98,14 +139,18 @@ def __eq__(self, other): return ( self.substrait_plan == other.substrait_plan - and self.ibis_function.__code__.co_code - == other.ibis_function.__code__.co_code + and self.udf.__code__.co_code == other.udf.__code__.co_code ) + def __deepcopy__( + self, memo: Optional[Dict[int, Any]] = None + ) -> "SubstraitTransformation": + return SubstraitTransformation(substrait_plan=self.substrait_plan, udf=self.udf) + def to_proto(self) -> SubstraitTransformationProto: return SubstraitTransformationProto( substrait_plan=self.substrait_plan, - ibis_function=dill.dumps(self.ibis_function, recurse=True), + ibis_function=dill.dumps(self.udf, recurse=True), ) @classmethod @@ -115,11 +160,19 @@ def from_proto( ): return SubstraitTransformation( substrait_plan=substrait_transformation_proto.substrait_plan, - ibis_function=dill.loads(substrait_transformation_proto.ibis_function), + udf=dill.loads(substrait_transformation_proto.ibis_function), ) @classmethod def from_ibis(cls, user_function, sources): + from ibis.expr.types.relations import Table + + return_annotation = get_type_hints(user_function).get("return", inspect._empty) + if return_annotation not in (inspect._empty, Table): + raise TypeError( + f"User function must return an ibis Table, got {return_annotation} for SubstraitTransformation" + ) + import ibis import ibis.expr.datatypes as dt from ibis_substrait.compiler.core import SubstraitCompiler @@ -145,7 +198,9 @@ def from_ibis(cls, user_function, sources): expr = user_function(ibis.table(input_fields, "t")) + substrait_plan = compiler.compile(expr).SerializeToString() + return SubstraitTransformation( - substrait_plan=compiler.compile(expr).SerializeToString(), - ibis_function=user_function, + substrait_plan=substrait_plan, + udf=user_function, ) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index c029648aeeb..c46aff681a3 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -85,7 +85,7 @@ def pytest_configure(config): if platform in ["darwin", "windows"]: - multiprocessing.set_start_method("spawn") + multiprocessing.set_start_method("spawn", force=True) else: multiprocessing.set_start_method("fork") config.addinivalue_line( diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_bfvs.py b/sdk/python/tests/example_repos/example_feature_repo_with_bfvs.py index e0f75c0c6ff..9ee05b47fe4 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_bfvs.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_bfvs.py @@ -18,6 +18,8 @@ driver_hourly_stats_view = BatchFeatureView( name="driver_hourly_stats", entities=[driver], + mode="python", + udf=lambda x: x, ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), @@ -41,6 +43,8 @@ global_stats_feature_view = BatchFeatureView( name="global_daily_stats", entities=None, + mode="python", + udf=lambda x: x, ttl=timedelta(days=1), schema=[ Field(name="num_rides", dtype=Int32), diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 915e88812bc..cd02c3478c7 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -189,6 +189,7 @@ def create_item_embeddings_batch_feature_view( ], source=source, ttl=timedelta(hours=2), + udf=lambda x: x, ) return item_embeddings_feature_view @@ -231,6 +232,7 @@ def create_driver_hourly_stats_batch_feature_view( source=source, ttl=timedelta(hours=2), tags=TAGS, + udf=lambda x: x, ) return driver_stats_feature_view diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index 1b9b48d8d0a..931acfb3919 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -93,6 +93,7 @@ def test_apply_feature_view(test_feature_store): Field(name="fs1_my_feature_4", dtype=Array(Bytes)), Field(name="entity_id", dtype=Int64), ], + udf=lambda df: df, entities=[entity], tags={"team": "matchmaking", "tag": "two"}, source=batch_source, @@ -654,7 +655,7 @@ def pandas_view(pandas_df): import pandas as pd assert type(pandas_df) == pd.DataFrame - df = pandas_df.transform(lambda x: x + 10, axis=1) + df = pandas_df.transform(lambda x: x + 10) df.insert(2, "C", [20.2, 230.0, 34.0], True) return df diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index ce789c706c5..911c94ff34c 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -36,6 +36,8 @@ def test_create_batch_feature_view(): entities=[], ttl=timedelta(days=30), source=batch_source, + mode="python", + udf=lambda x: x, ) with pytest.raises(TypeError): @@ -54,6 +56,8 @@ def test_create_batch_feature_view(): with pytest.raises(ValueError): BatchFeatureView( name="test batch feature view", + mode="python", + udf=lambda x: x, entities=[], ttl=timedelta(days=30), source=stream_source, diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 69724779cdd..505146aa612 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -149,7 +149,7 @@ def test_hash(): assert len(s4) == 3 assert on_demand_feature_view_5.feature_transformation == PandasTransformation( - udf2, "udf2 source code" + udf2, udf_string="udf2 source code" ) @@ -180,26 +180,25 @@ def test_python_native_transformation_mode(): mode="python", ) - on_demand_feature_view_python_native_err = OnDemandFeatureView( - name="my-on-demand-feature-view", - sources=sources, - schema=[ - Field(name="output1", dtype=Float32), - Field(name="output2", dtype=Float32), - ], - feature_transformation=PandasTransformation( - udf=python_native_udf, udf_string="python native udf source code" - ), - description="test", - mode="python", - ) - assert ( on_demand_feature_view_python_native.feature_transformation == PythonTransformation(python_native_udf, "python native udf source code") ) with pytest.raises(TypeError): + on_demand_feature_view_python_native_err = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=python_native_udf, udf_string="python native udf source code" + ), + description="test", + mode="python", + ) assert ( on_demand_feature_view_python_native_err.feature_transformation == PythonTransformation(python_native_udf, "python native udf source code") diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py index 4f93691028e..96e62d9d9e2 100644 --- a/sdk/python/tests/unit/test_stream_feature_view.py +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -4,7 +4,6 @@ import pytest from feast.aggregation import Aggregation -from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat from feast.data_source import KafkaSource, PushSource from feast.entity import Entity @@ -18,37 +17,6 @@ from feast.utils import _utc_now, make_tzaware -def test_create_batch_feature_view(): - batch_source = FileSource(path="some path") - BatchFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - source=batch_source, - ) - - with pytest.raises(TypeError): - BatchFeatureView( - name="test batch feature view", entities=[], ttl=timedelta(days=30) - ) - - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - with pytest.raises(ValueError): - BatchFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - ) - - def test_create_stream_feature_view(): stream_source = KafkaSource( name="kafka", @@ -64,6 +32,7 @@ def test_create_stream_feature_view(): ttl=timedelta(days=30), source=stream_source, aggregations=[], + udf=lambda x: x, ) push_source = PushSource( @@ -75,6 +44,7 @@ def test_create_stream_feature_view(): ttl=timedelta(days=30), source=push_source, aggregations=[], + udf=lambda x: x, ) with pytest.raises(TypeError): @@ -92,6 +62,7 @@ def test_create_stream_feature_view(): ttl=timedelta(days=30), source=FileSource(path="some path"), aggregations=[], + udf=lambda x: x, ) @@ -173,7 +144,7 @@ def pandas_udf(pandas_df): import pandas as pd assert type(pandas_df) == pd.DataFrame - df = pandas_df.transform(lambda x: x + 10, axis=1) + df = pandas_df.transform(lambda x: x + 10) return df import pandas as pd @@ -230,6 +201,7 @@ def test_stream_feature_view_proto_type(): ttl=timedelta(days=30), source=stream_source, aggregations=[], + udf=lambda x: x, ) assert sfv.proto_class is StreamFeatureViewProto @@ -249,6 +221,7 @@ def test_stream_feature_view_copy(): ttl=timedelta(days=30), source=stream_source, aggregations=[], + udf=lambda x: x, ) assert sfv == copy.copy(sfv)