diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index c985912a03..7ead26cb98 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -42,6 +42,76 @@ OnDemandSourceType = Union[FeatureView, FeatureViewProjection, RequestSource] +class ODFVErrorMessages: + """Centralized error message templates for OnDemandFeatureView.""" + + @staticmethod + def unsupported_source_type(source_type: type, supported_types: str) -> str: + return ( + f"Unsupported source type: {source_type.__name__}. " + f"Supported types are {supported_types}." + ) + + @staticmethod + def singleton_mode_requires_python(current_mode: str) -> str: + return ( + f"Singleton mode is only supported with mode='python', " + f"but mode='{current_mode}' was specified. Either disable singleton " + f"(singleton=False) or change mode to 'python'." + ) + + @staticmethod + def online_store_requires_entities() -> str: + return ( + "OnDemandFeatureView configured with write_to_online_store=True " + "must have at least one entity defined. Either add entities or " + "set write_to_online_store=False." + ) + + @staticmethod + def no_transformation_provided() -> str: + return ( + "OnDemandFeatureView must have a valid feature_transformation. " + "Provide either a udf parameter or a feature_transformation parameter." + ) + + @staticmethod + def duplicate_source_names(overlapping_names: set) -> str: + return ( + f"Source names must be unique across all source types. " + f"Found duplicate names: {overlapping_names}" + ) + + @staticmethod + def no_sources_configured() -> str: + return ( + "OnDemandFeatureView must have at least one source. " + "Add either FeatureView/FeatureViewProjection sources or RequestSource sources." + ) + + @staticmethod + def mode_transformation_mismatch( + mode: str, expected_type: str, actual_type: str + ) -> str: + return f"Mode '{mode}' requires {expected_type}, but got {actual_type}." + + @staticmethod + def unknown_source_type_in_proto(source_type: str | None) -> str: + return f"Unknown source type in protobuf: {source_type}" + + @staticmethod + def unsupported_transformation_type(transformation_type: str) -> str: + return f"Unsupported transformation type: {transformation_type}" + + @staticmethod + def backward_compatible_udf_missing() -> str: + return "Backward compatible UDF requires user_defined_function field" + + @staticmethod + def unsupported_mode_for_udf(mode: str) -> str: + return f"Unsupported mode '{mode}' for user_defined_function" + + @typechecked class OnDemandFeatureView(BaseFeatureView): """ @@ -139,16 +209,10 @@ def __init__( # noqa: C901 self.udf_string = udf_string self.source_feature_view_projections: dict[str, FeatureViewProjection] = {} self.source_request_sources: dict[str, RequestSource] = {} - for odfv_source in sources: - if isinstance(odfv_source, RequestSource): - self.source_request_sources[odfv_source.name] = odfv_source - elif isinstance(odfv_source, FeatureViewProjection): - self.source_feature_view_projections[odfv_source.name] = odfv_source - else: - self.source_feature_view_projections[odfv_source.name] = ( - odfv_source.projection - ) + # Process each source with explicit type handling + for odfv_source in sources: + self._add_source_to_collections(odfv_source) features: List[Field] = [] self.entity_columns = [] @@ -190,14 +254,41 @@ def __init__( # noqa: C901 self.write_to_online_store = write_to_online_store self.singleton = singleton if self.singleton and self.mode != "python": - raise ValueError("Singleton is only supported for Python mode.") + raise ValueError( + ODFVErrorMessages.singleton_mode_requires_python(self.mode) + ) self.aggregations = aggregations or [] - def get_feature_transformation(self) -> Transformation: - if not self.udf: + def _add_source_to_collections(self, odfv_source: OnDemandSourceType) -> None: + """ + Add a source to the appropriate collection with explicit type checking. + + Args: + odfv_source: The source to add (RequestSource, FeatureViewProjection, or FeatureView) + + Raises: + ValueError: If the source type is not supported + """ + if isinstance(odfv_source, RequestSource): + self.source_request_sources[odfv_source.name] = odfv_source + elif isinstance(odfv_source, FeatureViewProjection): + self.source_feature_view_projections[odfv_source.name] = odfv_source + elif isinstance(odfv_source, FeatureView): + # FeatureView sources use their projection + self.source_feature_view_projections[odfv_source.name] = ( + odfv_source.projection + ) + else: raise ValueError( - "Either udf or feature_transformation must be provided to create an OnDemandFeatureView" + ODFVErrorMessages.unsupported_source_type( + type(odfv_source), + "RequestSource, FeatureViewProjection, and FeatureView", + ) ) + + def get_feature_transformation(self) -> Transformation: + if not self.udf: + raise ValueError(ODFVErrorMessages.no_transformation_provided()) if self.mode in ( TransformationMode.PANDAS, TransformationMode.PYTHON, @@ -276,15 +367,82 @@ def ensure_valid(self): Validates the state of this feature view locally. Raises: - ValueError: The On Demand feature view does not have an entity when trying to use write_to_online_store. + ValueError: If the OnDemandFeatureView configuration is invalid. """ super().ensure_valid() + # Validate write_to_online_store configuration + self._validate_online_store_config() + + # Validate singleton mode configuration + self._validate_singleton_config() + + # Validate sources configuration + self._validate_sources_config() + + # Validate transformation compatibility + self._validate_transformation_config() + + def _validate_online_store_config(self) -> None: + """Validate write_to_online_store configuration.""" if self.write_to_online_store and not self.entities: + raise ValueError(ODFVErrorMessages.online_store_requires_entities()) + + def _validate_singleton_config(self) -> None: + """Validate singleton mode configuration.""" + if self.singleton and self.mode != "python": raise ValueError( - "On Demand Feature views require an entity if write_to_online_store=True" + ODFVErrorMessages.singleton_mode_requires_python(self.mode) ) + def _validate_sources_config(self) -> None: + """Validate sources configuration.""" + if not self.source_feature_view_projections and not self.source_request_sources: + raise ValueError(ODFVErrorMessages.no_sources_configured()) + + # Validate source names are unique + fv_names = set(self.source_feature_view_projections.keys()) + req_names = set(self.source_request_sources.keys()) + overlapping_names = fv_names.intersection(req_names) + + if overlapping_names: + raise ValueError( + ODFVErrorMessages.duplicate_source_names(overlapping_names) + ) + + def _validate_transformation_config(self) -> None: + """Validate transformation configuration.""" + if not self.feature_transformation: + raise ValueError(ODFVErrorMessages.no_transformation_provided()) + + # Validate mode compatibility with transformation type + if self.mode in ("pandas", "python"): + from feast.transformation.pandas_transformation import PandasTransformation + from feast.transformation.python_transformation import PythonTransformation + + expected_types = (PandasTransformation, PythonTransformation) + if not isinstance(self.feature_transformation, expected_types): + raise ValueError( + ODFVErrorMessages.mode_transformation_mismatch( + self.mode, + "PandasTransformation or PythonTransformation", + type(self.feature_transformation).__name__, + ) + ) + elif self.mode == "substrait": + from feast.transformation.substrait_transformation import ( + SubstraitTransformation, + ) + + if not isinstance(self.feature_transformation, SubstraitTransformation): + raise ValueError( + ODFVErrorMessages.mode_transformation_mismatch( + self.mode, + "SubstraitTransformation", + type(self.feature_transformation).__name__, + ) + ) + def __hash__(self): return super().__hash__() @@ -368,144 +526,202 @@ def from_proto( Returns: A OnDemandFeatureView object based on the on-demand feature view protobuf. """ - sources = [] - for ( - _, - on_demand_source, - ) in on_demand_feature_view_proto.spec.sources.items(): - if on_demand_source.WhichOneof("source") == "feature_view": + # Parse sources from proto + sources = cls._parse_sources_from_proto(on_demand_feature_view_proto) + + # Parse transformation from proto + transformation = cls._parse_transformation_from_proto( + on_demand_feature_view_proto + ) + + # Parse optional fields with defaults + optional_fields = cls._parse_optional_fields_from_proto( + on_demand_feature_view_proto + ) + + # Create the OnDemandFeatureView object + on_demand_feature_view_obj = cls( + name=on_demand_feature_view_proto.spec.name, + schema=cls._parse_features_from_proto(on_demand_feature_view_proto), + sources=cast(List[OnDemandSourceType], sources), + feature_transformation=transformation, + mode=on_demand_feature_view_proto.spec.mode or "pandas", + description=on_demand_feature_view_proto.spec.description, + tags=dict(on_demand_feature_view_proto.spec.tags), + owner=on_demand_feature_view_proto.spec.owner, + write_to_online_store=optional_fields["write_to_online_store"], + singleton=optional_fields["singleton"], + aggregations=optional_fields["aggregations"], + ) + + # Set additional attributes that aren't part of the constructor + on_demand_feature_view_obj.entities = optional_fields["entities"] + on_demand_feature_view_obj.entity_columns = optional_fields["entity_columns"] + + # FeatureViewProjections are not saved in the OnDemandFeatureView proto. + # Create the default projection. + on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( + on_demand_feature_view_obj + ) + + # Set timestamps if present + cls._set_timestamps_from_proto( + on_demand_feature_view_proto, on_demand_feature_view_obj + ) + + return on_demand_feature_view_obj + + @classmethod + def _parse_sources_from_proto( + cls, proto: OnDemandFeatureViewProto + ) -> List[OnDemandSourceType]: + """Parse and convert sources from the protobuf representation.""" + sources: List[OnDemandSourceType] = [] + for _, on_demand_source in proto.spec.sources.items(): + source_type = on_demand_source.WhichOneof("source") + + if source_type == "feature_view": sources.append( FeatureView.from_proto(on_demand_source.feature_view).projection ) - elif on_demand_source.WhichOneof("source") == "feature_view_projection": + elif source_type == "feature_view_projection": sources.append( FeatureViewProjection.from_proto( on_demand_source.feature_view_projection ) ) - else: + elif source_type == "request_data_source": sources.append( RequestSource.from_proto(on_demand_source.request_data_source) ) + else: + raise ValueError( + ODFVErrorMessages.unknown_source_type_in_proto(source_type) + ) - if ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "user_defined_function" - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - != "" - and on_demand_feature_view_proto.spec.mode == "pandas" - ): - transformation = PandasTransformation.from_proto( - on_demand_feature_view_proto.spec.feature_transformation.user_defined_function - ) - elif ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "user_defined_function" - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - != "" - and on_demand_feature_view_proto.spec.mode == "python" - ): - transformation = PythonTransformation.from_proto( - on_demand_feature_view_proto.spec.feature_transformation.user_defined_function - ) - elif ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "substrait_transformation" - ): - transformation = SubstraitTransformation.from_proto( - on_demand_feature_view_proto.spec.feature_transformation.substrait_transformation - ) - elif ( - hasattr(on_demand_feature_view_proto.spec, "user_defined_function") - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - == "" - ): - backwards_compatible_udf = UserDefinedFunctionProto( - name=on_demand_feature_view_proto.spec.user_defined_function.name, - body=on_demand_feature_view_proto.spec.user_defined_function.body, - body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text, - ) - transformation = PandasTransformation.from_proto( - user_defined_function_proto=backwards_compatible_udf, + return sources + + @classmethod + def _parse_transformation_from_proto( + cls, proto: OnDemandFeatureViewProto + ) -> Transformation: + """Parse and convert the transformation from the protobuf representation.""" + feature_transformation = proto.spec.feature_transformation + transformation_type = feature_transformation.WhichOneof("transformation") + mode = proto.spec.mode + + if transformation_type == "user_defined_function": + udf_proto = feature_transformation.user_defined_function + + # Check for non-empty UDF body + if udf_proto.body_text: + if mode == "pandas": + return PandasTransformation.from_proto(udf_proto) + elif mode == "python": + return PythonTransformation.from_proto(udf_proto) + else: + raise ValueError(ODFVErrorMessages.unsupported_mode_for_udf(mode)) + else: + # Handle backward compatibility case with empty body_text + return cls._handle_backward_compatible_udf(proto) + + elif transformation_type == "substrait_transformation": + return SubstraitTransformation.from_proto( + feature_transformation.substrait_transformation ) + elif transformation_type is None: + # Handle backward compatibility case where feature_transformation is cleared + return cls._handle_backward_compatible_udf(proto) else: - raise ValueError("At least one transformation type needs to be provided") + raise ValueError( + ODFVErrorMessages.unsupported_transformation_type(transformation_type) + ) + + @classmethod + def _handle_backward_compatible_udf( + cls, proto: OnDemandFeatureViewProto + ) -> Transformation: + """Handle backward compatibility for UDFs with empty body_text.""" + if not hasattr(proto.spec, "user_defined_function"): + raise ValueError(ODFVErrorMessages.backward_compatible_udf_missing()) + + old_udf = proto.spec.user_defined_function + backwards_compatible_udf = UserDefinedFunctionProto( + name=old_udf.name, + body=old_udf.body, + body_text=old_udf.body_text, + ) + return PandasTransformation.from_proto( + user_defined_function_proto=backwards_compatible_udf, + ) - if hasattr(on_demand_feature_view_proto.spec, "write_to_online_store"): - write_to_online_store = ( - on_demand_feature_view_proto.spec.write_to_online_store + @classmethod + def _parse_features_from_proto(cls, proto: OnDemandFeatureViewProto) -> List[Field]: + """Parse features from the protobuf representation.""" + return [ + Field( + name=feature.name, + dtype=from_value_type(ValueType(feature.value_type)), + vector_index=feature.vector_index, + vector_length=feature.vector_length, + vector_search_metric=feature.vector_search_metric, ) - else: - write_to_online_store = False - if hasattr(on_demand_feature_view_proto.spec, "entities"): - entities = list(on_demand_feature_view_proto.spec.entities) - else: - entities = [] - if hasattr(on_demand_feature_view_proto.spec, "entity_columns"): + for feature in proto.spec.features + ] + + @classmethod + def _parse_optional_fields_from_proto(cls, proto: OnDemandFeatureViewProto) -> dict: + """Parse optional fields from protobuf with appropriate defaults.""" + spec = proto.spec + + # Parse write_to_online_store + write_to_online_store = False + if hasattr(spec, "write_to_online_store"): + write_to_online_store = spec.write_to_online_store + + # Parse entities + entities = [] + if hasattr(spec, "entities"): + entities = list(spec.entities) + + # Parse entity_columns + entity_columns = [] + if hasattr(spec, "entity_columns"): entity_columns = [ - Field.from_proto(field_proto) - for field_proto in on_demand_feature_view_proto.spec.entity_columns + Field.from_proto(field_proto) for field_proto in spec.entity_columns ] - else: - entity_columns = [] + + # Parse singleton singleton = False - if hasattr(on_demand_feature_view_proto.spec, "singleton"): - singleton = on_demand_feature_view_proto.spec.singleton + if hasattr(spec, "singleton"): + singleton = spec.singleton + # Parse aggregations aggregations = [] - if hasattr(on_demand_feature_view_proto.spec, "aggregations"): + if hasattr(spec, "aggregations"): aggregations = [ Aggregation.from_proto(aggregation_proto) - for aggregation_proto in on_demand_feature_view_proto.spec.aggregations + for aggregation_proto in spec.aggregations ] - on_demand_feature_view_obj = cls( - name=on_demand_feature_view_proto.spec.name, - schema=[ - Field( - name=feature.name, - dtype=from_value_type(ValueType(feature.value_type)), - vector_index=feature.vector_index, - vector_length=feature.vector_length, - vector_search_metric=feature.vector_search_metric, - ) - for feature in on_demand_feature_view_proto.spec.features - ], - sources=cast(List[OnDemandSourceType], sources), - feature_transformation=transformation, - mode=on_demand_feature_view_proto.spec.mode or "pandas", - description=on_demand_feature_view_proto.spec.description, - tags=dict(on_demand_feature_view_proto.spec.tags), - owner=on_demand_feature_view_proto.spec.owner, - write_to_online_store=write_to_online_store, - singleton=singleton, - aggregations=aggregations, - ) - on_demand_feature_view_obj.entities = entities - on_demand_feature_view_obj.entity_columns = entity_columns - - # FeatureViewProjections are not saved in the OnDemandFeatureView proto. - # Create the default projection. - on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( - on_demand_feature_view_obj - ) + return { + "write_to_online_store": write_to_online_store, + "entities": entities, + "entity_columns": entity_columns, + "singleton": singleton, + "aggregations": aggregations, + } - if on_demand_feature_view_proto.meta.HasField("created_timestamp"): - on_demand_feature_view_obj.created_timestamp = ( - on_demand_feature_view_proto.meta.created_timestamp.ToDatetime() - ) - if on_demand_feature_view_proto.meta.HasField("last_updated_timestamp"): - on_demand_feature_view_obj.last_updated_timestamp = ( - on_demand_feature_view_proto.meta.last_updated_timestamp.ToDatetime() - ) + @classmethod + def _set_timestamps_from_proto( + cls, proto: OnDemandFeatureViewProto, obj: "OnDemandFeatureView" + ) -> None: + """Set timestamp fields on the object if they exist in the proto.""" + if proto.meta.HasField("created_timestamp"): + obj.created_timestamp = proto.meta.created_timestamp.ToDatetime() - return on_demand_feature_view_obj + if proto.meta.HasField("last_updated_timestamp"): + obj.last_updated_timestamp = proto.meta.last_updated_timestamp.ToDatetime() def get_request_data_schema(self) -> dict[str, ValueType]: schema: dict[str, ValueType] = {} @@ -541,10 +757,29 @@ def transform_ibis( "The feature_transformation is not SubstraitTransformation type while calling transform_ibis()." ) + # Apply common preprocessing to ensure both full and short feature names exist + ibis_table, columns_to_cleanup = self._preprocess_ibis_table(ibis_table) + + # Apply the transformation + transformed_table = self.feature_transformation.transform_ibis(ibis_table) + + # Clean up temporary columns + if columns_to_cleanup: + transformed_table = transformed_table.drop(*columns_to_cleanup) + + # Apply final column renaming based on full_feature_names preference + return self._postprocess_ibis_table(transformed_table, full_feature_names) + + def _preprocess_ibis_table(self, ibis_table): + """ + Preprocess ibis table to ensure both full and short feature names exist. + Returns the modified table and columns that need cleanup. + """ columns_to_cleanup = [] for source_fv_projection in self.source_feature_view_projections.values(): for feature in source_fv_projection.features: full_feature_ref = f"{source_fv_projection.name}__{feature.name}" + if full_feature_ref in ibis_table.columns: # Make sure the partial feature name is always present ibis_table = ibis_table.mutate( @@ -552,24 +787,29 @@ def transform_ibis( ) columns_to_cleanup.append(feature.name) elif feature.name in ibis_table.columns: + # Make sure the full feature name is always present ibis_table = ibis_table.mutate( **{full_feature_ref: ibis_table[feature.name]} ) columns_to_cleanup.append(full_feature_ref) - transformed_table = self.feature_transformation.transform_ibis(ibis_table) - - transformed_table = transformed_table.drop(*columns_to_cleanup) + return ibis_table, columns_to_cleanup + def _postprocess_ibis_table(self, transformed_table, full_feature_names: bool): + """ + Apply final column renaming to match the desired naming convention. + """ rename_columns: dict[str, str] = {} for feature in self.features: short_name = feature.name long_name = self._get_projected_feature_name(feature.name) + if short_name in transformed_table.columns and full_feature_names: rename_columns[short_name] = long_name - elif not full_feature_names: + elif long_name in transformed_table.columns and not full_feature_names: rename_columns[long_name] = short_name + # Apply renamings for rename_from, rename_to in rename_columns.items(): if rename_from in transformed_table.columns: transformed_table = transformed_table.rename(**{rename_to: rename_from}) @@ -583,10 +823,30 @@ def transform_arrow( ) -> pyarrow.Table: if not isinstance(pa_table, pyarrow.Table): raise TypeError("transform_arrow only accepts pyarrow.Table") + + # Apply common preprocessing to ensure both full and short feature names exist + pa_table, columns_to_cleanup = self._preprocess_arrow_table(pa_table) + + # Apply the transformation + transformed_table = self.feature_transformation.transform_arrow( + pa_table, self.features + ) + + # Clean up temporary columns and apply final renaming + return self._postprocess_arrow_table( + transformed_table, columns_to_cleanup, full_feature_names + ) + + def _preprocess_arrow_table(self, pa_table: pyarrow.Table): + """ + Preprocess pyarrow table to ensure both full and short feature names exist. + Returns the modified table and columns that need cleanup. + """ columns_to_cleanup = [] for source_fv_projection in self.source_feature_view_projections.values(): for feature in source_fv_projection.features: full_feature_ref = f"{source_fv_projection.name}__{feature.name}" + if full_feature_ref in pa_table.column_names: # Make sure the partial feature name is always present pa_table = pa_table.append_column( @@ -600,64 +860,99 @@ def transform_arrow( ) columns_to_cleanup.append(full_feature_ref) - df_with_transformed_features: pyarrow.Table = ( - self.feature_transformation.transform_arrow(pa_table, self.features) - ) + return pa_table, columns_to_cleanup - # Work out whether the correct columns names are used. + def _postprocess_arrow_table( + self, + transformed_table: pyarrow.Table, + columns_to_cleanup: list[str], + full_feature_names: bool, + ) -> pyarrow.Table: + """ + Clean up temporary columns and apply final column renaming. + """ + # Determine final column names rename_columns: dict[str, str] = {} for feature in self.features: short_name = feature.name long_name = self._get_projected_feature_name(feature.name) - if ( - short_name in df_with_transformed_features.column_names - and full_feature_names - ): + + if short_name in transformed_table.column_names and full_feature_names: rename_columns[short_name] = long_name - elif not full_feature_names: + elif long_name in transformed_table.column_names and not full_feature_names: rename_columns[long_name] = short_name - # Cleanup extra columns used for transformation + # Clean up temporary columns for col in columns_to_cleanup: - if col in df_with_transformed_features.column_names: - df_with_transformed_features = df_with_transformed_features.drop(col) - return df_with_transformed_features.rename_columns( - [ - rename_columns.get(c, c) - for c in df_with_transformed_features.column_names - ] - ) + if col in transformed_table.column_names: + transformed_table = transformed_table.drop(col) + + # Apply column renaming + final_column_names = [ + rename_columns.get(c, c) for c in transformed_table.column_names + ] + return transformed_table.rename_columns(final_column_names) def transform_dict( self, feature_dict: dict[str, Any], # type: ignore ) -> dict[str, Any]: - # we need a mapping from full feature name to short and back to do a renaming - # The simplest thing to do is to make the full reference, copy the columns with the short reference - # and rerun - columns_to_cleanup: list[str] = [] - for source_fv_projection in self.source_feature_view_projections.values(): - for feature in source_fv_projection.features: - full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in feature_dict.keys(): - # Make sure the partial feature name is always present - feature_dict[feature.name] = feature_dict[full_feature_ref] - columns_to_cleanup.append(str(feature.name)) - elif feature.name in feature_dict.keys(): - # Make sure the full feature name is always present - feature_dict[full_feature_ref] = feature_dict[feature.name] - columns_to_cleanup.append(str(full_feature_ref)) + """ + Transform a dictionary of features using the configured transformation. + Handles both singleton and batch transformations. + + Args: + feature_dict: Dictionary containing input features + Returns: + Dictionary with transformed features + """ + # Preprocess to ensure both full and short feature names exist + preprocessed_dict, columns_to_cleanup = self._preprocess_feature_dict( + feature_dict + ) + + # Apply the appropriate transformation based on mode if self.singleton and self.mode == "python": - output_dict: dict[str, Any] = ( - self.feature_transformation.transform_singleton(feature_dict) + output_dict = self.feature_transformation.transform_singleton( + preprocessed_dict ) else: - output_dict = self.feature_transformation.transform(feature_dict) + output_dict = self.feature_transformation.transform(preprocessed_dict) + + # Clean up temporary columns for feature_name in columns_to_cleanup: - del output_dict[feature_name] + if feature_name in output_dict: + del output_dict[feature_name] + return output_dict + def _preprocess_feature_dict( + self, feature_dict: dict[str, Any] + ) -> tuple[dict[str, Any], list[str]]: + """ + Preprocess feature dictionary to ensure both full and short feature names exist. + Returns the modified dictionary and columns that need cleanup. + """ + # Create a copy to avoid modifying the original + preprocessed_dict = feature_dict.copy() + columns_to_cleanup = [] + + for source_fv_projection in self.source_feature_view_projections.values(): + for feature in source_fv_projection.features: + full_feature_ref = f"{source_fv_projection.name}__{feature.name}" + + if full_feature_ref in feature_dict: + # Make sure the partial feature name is always present + preprocessed_dict[feature.name] = feature_dict[full_feature_ref] + columns_to_cleanup.append(feature.name) + elif feature.name in feature_dict: + # Make sure the full feature name is always present + preprocessed_dict[full_feature_ref] = feature_dict[feature.name] + columns_to_cleanup.append(full_feature_ref) + + return preprocessed_dict, columns_to_cleanup + def infer_features(self) -> None: random_input = self._construct_random_input(singleton=self.singleton) inferred_features = self.feature_transformation.infer_features( @@ -667,18 +962,10 @@ def infer_features(self) -> None: if self.features: missing_features = [] for specified_feature in self.features: - if ( - specified_feature not in inferred_features - and "Array" not in specified_feature.dtype.__str__() + if not self._feature_exists_in_inferred( + specified_feature, inferred_features ): missing_features.append(specified_feature) - elif "Array" in specified_feature.dtype.__str__(): - if specified_feature.name not in [ - f.name for f in inferred_features - ]: - missing_features.append(specified_feature) - else: - pass if missing_features: raise SpecifiedFeaturesNotPresentError( missing_features, inferred_features, self.name @@ -692,17 +979,110 @@ def infer_features(self) -> None: f"Could not infer Features for the feature view '{self.name}'.", ) + def _feature_exists_in_inferred( + self, specified_feature: Field, inferred_features: List[Field] + ) -> bool: + """ + Check if a specified feature exists in the inferred features list. + Handles both regular features and array types properly. + + Args: + specified_feature: The feature to check for + inferred_features: List of inferred features + + Returns: + True if the feature exists in the inferred features, False otherwise + """ + # Check for exact feature match first + if specified_feature in inferred_features: + return True + + # For array types, we need to check by name since array types + # might have different representations between specified and inferred + if self._is_array_type(specified_feature.dtype): + inferred_feature_names = {f.name for f in inferred_features} + return specified_feature.name in inferred_feature_names + + return False + + def _is_array_type(self, dtype) -> bool: + """Check if the dtype represents an array type.""" + # Use proper type checking instead of string comparison + dtype_str = str(dtype) + return "Array" in dtype_str or "List" in dtype_str + def _construct_random_input( self, singleton: bool = False ) -> dict[str, Union[list[Any], Any]]: - rand_dict_value: dict[ValueType, Union[list[Any], Any]] = { - ValueType.BYTES: [str.encode("hello world")], - ValueType.PDF_BYTES: [ - b"%PDF-1.3\n3 0 obj\n<>\nendobj\n4 0 obj\n<>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<>\nendobj\n5 0 obj\n<>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" - ], - ValueType.IMAGE_BYTES: [ - b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.' \",#\x1c\x1c(7),01444\x1f'9=82<.342\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x01\x01\x11\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x14\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00\x3f\x00\xaa\xff\xd9" - ], + """ + Construct random input data for feature inference. + + Args: + singleton: Whether to use singleton values (single values instead of lists) + + Returns: + Dictionary with random sample data for each source feature + """ + # Get sample values for each ValueType + sample_values = self._get_sample_values_by_type() + + # Convert to singleton values if needed + if singleton: + sample_values = {k: v[0] for k, v in sample_values.items()} + + # Default value for missing types + default_value = None if not singleton else [None] + + feature_dict = {} + + # Add feature view projection features + for feature_view_projection in self.source_feature_view_projections.values(): + for feature in feature_view_projection.features: + value_type = feature.dtype.to_value_type() + sample_value = sample_values.get(value_type, default_value) + + # Add both full and short feature references + feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( + sample_value + ) + feature_dict[feature.name] = sample_value + + # Add request source features + for request_data in self.source_request_sources.values(): + for field in request_data.schema: + value_type = field.dtype.to_value_type() + sample_value = sample_values.get(value_type, default_value) + feature_dict[field.name] = sample_value + + return feature_dict + + def _get_sample_values_by_type(self) -> dict[ValueType, list[Any]]: + """ + Get sample values for each supported ValueType. + Centralizes the mapping between ValueTypes and their sample values. + + Returns: + Dictionary mapping ValueType to sample values + """ + # Sample PDF bytes for testing + pdf_sample = ( + b"%PDF-1.3\n3 0 obj\n<>\nendobj\n" + b"4 0 obj\n<>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n" + b"1 0 obj\n<>\nendobj\n" + b"5 0 obj\n<>\nendobj\n" + b"2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n" + b"6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n" + b"7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\n" + b"xref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \n" + b"trailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" + ) + + # Sample image bytes (minimal JPEG) + image_sample = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.' \",#\x1c\x1c(7),01444\x1f'9=82<.342\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x01\x01\x11\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x14\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00\x3f\x00\xaa\xff\xd9" + + return { + # Basic types + ValueType.BYTES: [b"hello world"], ValueType.STRING: ["hello world"], ValueType.INT32: [1], ValueType.INT64: [1], @@ -710,7 +1090,11 @@ def _construct_random_input( ValueType.FLOAT: [1.0], ValueType.BOOL: [True], ValueType.UNIX_TIMESTAMP: [_utc_now()], - ValueType.BYTES_LIST: [[str.encode("hello world")]], + # Special binary types + ValueType.PDF_BYTES: [pdf_sample], + ValueType.IMAGE_BYTES: [image_sample], + # List types + ValueType.BYTES_LIST: [[b"hello world"]], ValueType.STRING_LIST: [["hello world"]], ValueType.INT32_LIST: [[1]], ValueType.INT64_LIST: [[1]], @@ -719,28 +1103,6 @@ def _construct_random_input( ValueType.BOOL_LIST: [[True]], ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]], } - if singleton: - rand_dict_value = {k: rand_dict_value[k][0] for k in rand_dict_value} - - rand_missing_value = [None] if singleton else None - feature_dict = {} - for feature_view_projection in self.source_feature_view_projections.values(): - for feature in feature_view_projection.features: - feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( - rand_dict_value.get( - feature.dtype.to_value_type(), rand_missing_value - ) - ) - feature_dict[f"{feature.name}"] = rand_dict_value.get( - feature.dtype.to_value_type(), rand_missing_value - ) - for request_data in self.source_request_sources.values(): - for field in request_data.schema: - feature_dict[f"{field.name}"] = rand_dict_value.get( - field.dtype.to_value_type(), rand_missing_value - ) - - return feature_dict @staticmethod def get_requested_odfvs(