diff --git a/docs/getting-started/concepts/batch-feature-view.md b/docs/getting-started/concepts/batch-feature-view.md index 9d0eb86389c..d9560ddcbc8 100644 --- a/docs/getting-started/concepts/batch-feature-view.md +++ b/docs/getting-started/concepts/batch-feature-view.md @@ -27,7 +27,7 @@ class BatchFeatureView(FeatureView): def __init__( *, name: str, - source: Union[DataSource, FeatureView, List[FeatureView]], + source: Optional[Union[DataSource, FeatureView, List[FeatureView]]] = None, sink_source: Optional[DataSource] = None, schema: Optional[List[Field]] = None, entities: Optional[List[Entity]] = None, @@ -142,6 +142,7 @@ See: ## 🛑 Gotchas - `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them). +- `source` is optional; if omitted (`None`), the feature view has no associated batch data source. - Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist. - Aggregation logic must reference columns present in the raw source or transformed inputs. diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index b0a62a1c854..66dc4c3de6f 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -61,6 +61,7 @@ message FeatureViewSpec { google.protobuf.Duration ttl = 6; // Batch/Offline DataSource where this view can retrieve offline feature data. + // Optional: if not set, the feature view has no associated batch data source (e.g. purely derived views). DataSource batch_source = 7; // Whether these features should be served online or not @@ -68,6 +69,7 @@ message FeatureViewSpec { bool online = 8; // Streaming DataSource from where this view can consume "online" feature data. + // Optional: only required for streaming feature views. DataSource stream_source = 9; // Description of the feature view. diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index d7dc2237bd3..478058c89b3 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -93,8 +93,7 @@ def __init__( self.created_timestamp = None self.last_updated_timestamp = None - if source: - self.source = source + self.source = source @property @abstractmethod diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index e2a1f78441a..925d70e58ab 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -82,7 +82,9 @@ def __init__( *, name: str, mode: Union[TransformationMode, str] = TransformationMode.PYTHON, - source: Union[DataSource, "BatchFeatureView", List["BatchFeatureView"]], + source: Optional[ + Union[DataSource, "BatchFeatureView", List["BatchFeatureView"]] + ] = None, sink_source: Optional[DataSource] = None, entities: Optional[List[Entity]] = None, ttl: Optional[timedelta] = None, @@ -115,6 +117,21 @@ def __init__( f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " ) + if source is None and aggregations: + raise ValueError( + "BatchFeatureView with aggregations requires a source to aggregate from." + ) + + if ( + source is None + and not udf + and not feature_transformation + and not aggregations + ): + raise ValueError( + "BatchFeatureView requires at least one of: source, udf, feature_transformation, or aggregations." + ) + self.mode = mode self.udf = udf self.udf_string = udf_string diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fdc790b71e1..fe0e7967345 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -702,11 +702,21 @@ def _make_inferences( ) update_data_sources_with_inferred_event_timestamp_col( - [view.batch_source for view in views_to_update], self.config + [ + view.batch_source + for view in views_to_update + if view.batch_source is not None + ], + self.config, ) update_data_sources_with_inferred_event_timestamp_col( - [view.batch_source for view in sfvs_to_update], self.config + [ + view.batch_source + for view in sfvs_to_update + if view.batch_source is not None + ], + self.config, ) # New feature views may reference previously applied entities. @@ -2416,6 +2426,8 @@ def write_to_offline_store( provider = self._get_provider() # Get columns of the batch source and the input dataframe. + if feature_view.batch_source is None: + raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.") column_names_and_types = ( provider.get_table_column_names_and_types_from_data_source( self.config, feature_view.batch_source diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index b6774170fbe..94e95da545f 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -73,9 +73,8 @@ class FeatureView(BaseFeatureView): ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 can result in extremely computationally intensive queries. - batch_source: The batch source of data where this group of features - is stored. This is optional ONLY if a push source is specified as the - stream_source, since push sources contain their own batch sources. + batch_source: Optional batch source of data where this group of features + is stored. If no source is provided, this will be None. stream_source: The stream source of data where this group of features is stored. schema: The schema of the feature view, including feature, timestamp, and entity columns. If not specified, can be inferred from the underlying data source. @@ -97,7 +96,7 @@ class FeatureView(BaseFeatureView): name: str entities: List[str] ttl: Optional[timedelta] - batch_source: DataSource + batch_source: Optional[DataSource] stream_source: Optional[DataSource] source_views: Optional[List["FeatureView"]] entity_columns: List[Field] @@ -115,7 +114,7 @@ def __init__( self, *, name: str, - source: Union[DataSource, "FeatureView", List["FeatureView"]], + source: Optional[Union[DataSource, "FeatureView", List["FeatureView"]]] = None, sink_source: Optional[DataSource] = None, schema: Optional[List[Field]] = None, entities: Optional[List[Entity]] = None, @@ -133,8 +132,9 @@ def __init__( Args: name: The unique name of the feature view. - source: The source of data for this group of features. May be a stream source, or a batch source. - If a stream source, the source should contain a batch_source for backfills & batch materialization. + source (optional): The source of data for this group of features. May be a stream source, + a batch source, a FeatureView, or a list of FeatureViews. If None, the feature view + has no associated data source. schema (optional): The schema of the feature view, including feature, timestamp, and entity columns. # TODO: clarify that schema is only useful here... @@ -170,7 +170,9 @@ def __init__( self.data_source: Optional[DataSource] = None self.source_views: List[FeatureView] = [] - if isinstance(source, DataSource): + if source is None: + pass # data_source remains None, source_views remains [] + elif isinstance(source, DataSource): self.data_source = source elif isinstance(source, FeatureView): self.source_views = [source] @@ -199,11 +201,14 @@ def __init__( elif self.data_source: # Batch source definition self.batch_source = self.data_source - else: + elif self.source_views: # Derived view source definition if not sink_source: raise ValueError("Derived FeatureView must specify `sink_source`.") self.batch_source = sink_source + else: + # source=None - no batch source + self.batch_source = None # Initialize features and entity columns. features: List[Field] = [] diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index 70415e9ed3a..530194ec6a8 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -98,19 +98,17 @@ def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection": @staticmethod def from_feature_view_definition(feature_view: "FeatureView"): # TODO need to implement this for StreamFeatureViews - if getattr(feature_view, "batch_source", None): + batch_source = getattr(feature_view, "batch_source", None) + if batch_source: return FeatureViewProjection( name=feature_view.name, name_alias=None, features=feature_view.features, desired_features=[], - timestamp_field=feature_view.batch_source.created_timestamp_column - or None, - created_timestamp_column=feature_view.batch_source.created_timestamp_column - or None, - date_partition_column=feature_view.batch_source.date_partition_column - or None, - batch_source=feature_view.batch_source or None, + timestamp_field=batch_source.created_timestamp_column or None, + created_timestamp_column=batch_source.created_timestamp_column or None, + date_partition_column=batch_source.date_partition_column or None, + batch_source=batch_source or None, ) else: return FeatureViewProjection( diff --git a/sdk/python/feast/feature_view_utils.py b/sdk/python/feast/feature_view_utils.py index 704a5a3d7b9..0b599f4777c 100644 --- a/sdk/python/feast/feature_view_utils.py +++ b/sdk/python/feast/feature_view_utils.py @@ -136,6 +136,8 @@ def resolve_feature_view_source( if not is_derived_view: # Regular feature view - use its batch_source directly + if feature_view.batch_source is None: + raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.") return FeatureViewSourceInfo( data_source=feature_view.batch_source, source_type="batch_source", @@ -178,8 +180,13 @@ def resolve_feature_view_source( if hasattr(parent_view, "source_views") and parent_view.source_views: # Parent is also a derived view - recursively find original source original_source_view = find_original_source_view(parent_view) + original_batch_source = original_source_view.batch_source + if original_batch_source is None: + raise ValueError( + f"Original source view '{original_source_view.name}' has no batch_source." + ) return FeatureViewSourceInfo( - data_source=original_source_view.batch_source, + data_source=original_batch_source, source_type="original_source", has_transformation=view_has_transformation, transformation_func=transformation_func, @@ -229,8 +236,13 @@ def resolve_feature_view_source_with_fallback( elif hasattr(feature_view, "source_views") and feature_view.source_views: # Try the original source view as last resort original_view = find_original_source_view(feature_view) + original_view_batch_source = original_view.batch_source + if original_view_batch_source is None: + raise ValueError( + f"Original source view '{original_view.name}' has no batch_source." + ) return FeatureViewSourceInfo( - data_source=original_view.batch_source, + data_source=original_view_batch_source, source_type="fallback_original_source", has_transformation=has_transformation(feature_view), transformation_func=get_transformation_function(feature_view), diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index f5f234b7301..16023e3dac6 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -221,6 +221,9 @@ def _infer_features_and_entities( fv, join_keys, run_inference_for_features, config ) + if fv.batch_source is None: + return + entity_columns: List[Field] = fv.entity_columns if fv.entity_columns else [] columns_to_exclude = { fv.batch_source.timestamp_field, diff --git a/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py index 9e2d217875b..b223328893d 100644 --- a/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py @@ -193,7 +193,7 @@ def _materialize_one( offline_job = self.offline_store.pull_latest_from_table_or_query( config=self.repo_config, - data_source=feature_view.batch_source, + data_source=feature_view.batch_source, # type: ignore[arg-type] join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, diff --git a/sdk/python/feast/infra/compute_engines/kubernetes/k8s_engine.py b/sdk/python/feast/infra/compute_engines/kubernetes/k8s_engine.py index 0dcff09f027..2f041301be7 100644 --- a/sdk/python/feast/infra/compute_engines/kubernetes/k8s_engine.py +++ b/sdk/python/feast/infra/compute_engines/kubernetes/k8s_engine.py @@ -145,7 +145,7 @@ def _materialize_one( offline_job = self.offline_store.pull_latest_from_table_or_query( config=self.repo_config, - data_source=feature_view.batch_source, + data_source=feature_view.batch_source, # type: ignore[arg-type] join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, diff --git a/sdk/python/feast/infra/compute_engines/ray/compute.py b/sdk/python/feast/infra/compute_engines/ray/compute.py index a5c1b3caab5..fa8f9747f3b 100644 --- a/sdk/python/feast/infra/compute_engines/ray/compute.py +++ b/sdk/python/feast/infra/compute_engines/ray/compute.py @@ -163,7 +163,7 @@ def _materialize_from_offline_store( # Pull data from offline store retrieval_job = self.offline_store.pull_latest_from_table_or_query( config=self.repo_config, - data_source=feature_view.batch_source, + data_source=feature_view.batch_source, # type: ignore[arg-type] join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, diff --git a/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py b/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py index 7441cb0f18c..d0a1152eb55 100644 --- a/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py +++ b/sdk/python/feast/infra/compute_engines/snowflake/snowflake_engine.py @@ -226,13 +226,14 @@ def _materialize_one( timestamp_field, created_timestamp_column, ) = _get_column_names(feature_view, entities) + assert feature_view.batch_source is not None # guaranteed by _get_column_names job_id = f"{feature_view.name}-{start_date}-{end_date}" try: offline_job = self.offline_store.pull_latest_from_table_or_query( config=self.repo_config, - data_source=feature_view.batch_source, + data_source=feature_view.batch_source, # type: ignore[arg-type] join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, @@ -341,6 +342,7 @@ def generate_snowflake_materialization_query( feature_batch: list, project: str, ) -> str: + assert feature_view.batch_source is not None if feature_view.batch_source.created_timestamp_column: fv_created_str = f',"{feature_view.batch_source.created_timestamp_column}"' else: @@ -406,6 +408,7 @@ def materialize_to_snowflake_online_store( project: str, ) -> None: assert_snowflake_feature_names(feature_view) + assert feature_view.batch_source is not None feature_names_str = '", "'.join( [feature.name for feature in feature_view.features] @@ -467,6 +470,7 @@ def materialize_to_external_online_store( feature_view: Union[StreamFeatureView, FeatureView], pbar: tqdm, ) -> None: + assert feature_view.batch_source is not None feature_names = [feature.name for feature in feature_view.features] with GetSnowflakeConnection(repo_config.batch_engine) as conn: diff --git a/sdk/python/feast/infra/compute_engines/spark/compute.py b/sdk/python/feast/infra/compute_engines/spark/compute.py index e6ec58dd74d..b6c7dc30d55 100644 --- a/sdk/python/feast/infra/compute_engines/spark/compute.py +++ b/sdk/python/feast/infra/compute_engines/spark/compute.py @@ -162,7 +162,7 @@ def _materialize_from_offline_store( SparkRetrievalJob, self.offline_store.pull_latest_from_table_or_query( config=self.repo_config, - data_source=feature_view.batch_source, + data_source=feature_view.batch_source, # type: ignore[arg-type] join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, timestamp_field=timestamp_field, diff --git a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py index bc7c60733b4..e4dbd67666d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py +++ b/sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py @@ -2168,7 +2168,7 @@ def get_historical_features( # Build reverse field mapping to get actual source column names reverse_field_mapping = {} - if fv.batch_source.field_mapping: + if fv.batch_source is not None and fv.batch_source.field_mapping: reverse_field_mapping = { v: k for k, v in fv.batch_source.field_mapping.items() } diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 809fbf4091d..ddb1efa9262 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -656,7 +656,10 @@ def _field_mapping( full_feature_names: bool, ) -> Tuple[dd.DataFrame, str]: # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping: + if ( + feature_view.batch_source is not None + and feature_view.batch_source.field_mapping + ): df_to_join = _run_dask_field_mapping( df_to_join, feature_view.batch_source.field_mapping ) diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py index b37877276c9..a52f560952a 100644 --- a/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store.py @@ -72,6 +72,8 @@ def _get_offline_store_for_feature_view( self, feature_view: FeatureView, config: RepoConfig ) -> OfflineStore: self._initialize_offline_stores(config) + if feature_view.batch_source is None: + raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.") source_type = feature_view.batch_source.source_type() store_key = self.get_source_key_from_type(source_type) if store_key is None: diff --git a/sdk/python/feast/infra/offline_stores/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py index 9d8891036fe..e25463d0081 100644 --- a/sdk/python/feast/infra/offline_stores/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -174,6 +174,10 @@ def get_historical_features_ibis( def read_fv( feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool ) -> Tuple: + if feature_view.batch_source is None: + raise ValueError( + f"Feature view '{feature_view.name}' has no batch_source and cannot be queried." + ) fv_table: Table = data_source_reader( feature_view.batch_source, str(config.repo_path) ) @@ -335,6 +339,8 @@ def offline_write_batch_ibis( progress: Optional[Callable[[int], Any]], data_source_writer: Callable[[pyarrow.Table, DataSource, str], None], ): + if feature_view.batch_source is None: + raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.") pa_schema, column_names = get_pyarrow_schema_from_batch_source( config, feature_view.batch_source ) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 9825453d6a6..0c478adb2c4 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -118,6 +118,10 @@ def get_feature_view_query_context( query_context = [] for feature_view, features in feature_views_to_feature_map.items(): + if feature_view.batch_source is None: + raise ValueError( + f"Feature view '{feature_view.name}' has no batch_source and cannot be queried." + ) reverse_field_mapping = { v: k for k, v in feature_view.batch_source.field_mapping.items() } diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 2f960a02822..6830929e776 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -364,7 +364,10 @@ def _prep_rows_to_write_for_ingestion( # Note: A dictionary mapping of column names in this data # source to feature names in a feature table or view. Only used for feature # columns, not entity or timestamp columns. - if hasattr(feature_view, "batch_source"): + if ( + hasattr(feature_view, "batch_source") + and feature_view.batch_source is not None + ): if feature_view.batch_source.field_mapping is not None: table = _run_pyarrow_field_mapping( table, feature_view.batch_source.field_mapping @@ -410,7 +413,10 @@ async def ingest_df_async( ) def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table): - if feature_view.batch_source.field_mapping is not None: + if ( + feature_view.batch_source is not None + and feature_view.batch_source.field_mapping is not None + ): table = _run_pyarrow_field_mapping( table, feature_view.batch_source.field_mapping ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index fa5d297752a..1a6de75b2e4 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -164,8 +164,9 @@ def parse_repo(repo_root: Path) -> RepoContents: # Handle batch sources defined with feature views. batch_source = obj.batch_source - assert batch_source - if not any((batch_source is ds) for ds in res.data_sources): + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): res.data_sources.append(batch_source) # Handle stream sources defined with feature views. @@ -180,10 +181,10 @@ def parse_repo(repo_root: Path) -> RepoContents: # Handle batch sources defined with feature views. batch_source = obj.batch_source - if not any((batch_source is ds) for ds in res.data_sources): + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): res.data_sources.append(batch_source) - - # Handle stream sources defined with feature views. assert obj.stream_source stream_source = obj.stream_source if not any((stream_source is ds) for ds in res.data_sources): @@ -195,7 +196,9 @@ def parse_repo(repo_root: Path) -> RepoContents: # Handle batch sources defined with feature views. batch_source = obj.batch_source - if not any((batch_source is ds) for ds in res.data_sources): + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): res.data_sources.append(batch_source) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities @@ -234,7 +237,9 @@ def plan( # TODO: When we support multiple projects in a single repo, we should filter repo contents by project if not skip_source_validation: provider = store._get_provider() - data_sources = [t.batch_source for t in repo.feature_views] + data_sources = [ + t.batch_source for t in repo.feature_views if t.batch_source is not None + ] # Make sure the data source used by this feature view is supported by Feast for data_source in data_sources: provider.validate_data_source(store.config, data_source) @@ -345,7 +350,9 @@ def apply_total_with_repo_instance( ): if not skip_source_validation: provider = store._get_provider() - data_sources = [t.batch_source for t in repo.feature_views] + data_sources = [ + t.batch_source for t in repo.feature_views if t.batch_source is not None + ] # Make sure the data source used by this feature view is supported by Feast for data_source in data_sources: provider.validate_data_source(store.config, data_source) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 69353b9c1d9..511186066c6 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -152,6 +152,11 @@ def _get_column_names( and reverse-mapped created timestamp column that will be passed into the query to the offline store. """ + if feature_view.batch_source is None: + raise ValueError( + f"Feature view '{feature_view.name}' has no batch_source and cannot be used for offline retrieval." + ) + # if we have mapped fields, use the original field names in the call to the offline store timestamp_field = feature_view.batch_source.timestamp_field @@ -342,6 +347,11 @@ def _convert_arrow_fv_to_proto( if isinstance(table, pyarrow.Table): table = table.to_batches()[0] + if feature_view.batch_source is None: + raise ValueError( + f"Feature view '{feature_view.name}' has no batch_source and cannot be converted to proto." + ) + # TODO: This will break if the feature view has aggregations or transformations columns = [ (field.name, field.dtype.to_value_type()) for field in feature_view.features diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini index 31640e509d7..1ad76b978e4 100644 --- a/sdk/python/pytest.ini +++ b/sdk/python/pytest.ini @@ -20,6 +20,7 @@ markers = slow: Tests taking >30 seconds cloud: Tests requiring cloud credentials local_only: Tests that run entirely locally + xdist_group: Group tests to run in the same xdist worker timeout = 300 timeout_method = thread diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 1c036367895..fb09395d789 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -619,6 +619,81 @@ def test_apply_feature_view_success(test_registry: BaseRegistry): test_registry.teardown() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_apply_feature_view_without_source_success(test_registry: BaseRegistry): + """Test that a FeatureView with no source can be applied, retrieved, updated, and deleted.""" + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + + fv1 = FeatureView( + name="my_feature_view_no_source", + schema=[ + Field(name="test", dtype=Int64), + Field(name="fs1_my_feature_1", dtype=Int64), + Field(name="fs1_my_feature_2", dtype=String), + Field(name="fs1_my_feature_3", dtype=Array(String)), + ], + entities=[entity], + tags={"team": "matchmaking"}, + source=None, + ttl=timedelta(minutes=5), + ) + + project = "project" + + # Register Feature View + test_registry.apply_feature_view(fv1, project) + + feature_views = test_registry.list_feature_views(project, tags=fv1.tags) + + assert len(feature_views) == 1 + assert feature_views[0].name == "my_feature_view_no_source" + assert feature_views[0].batch_source is None + assert feature_views[0].stream_source is None + assert feature_views[0].features[0].name == "fs1_my_feature_1" + assert feature_views[0].features[0].dtype == Int64 + assert feature_views[0].features[1].name == "fs1_my_feature_2" + assert feature_views[0].features[1].dtype == String + assert feature_views[0].features[2].name == "fs1_my_feature_3" + assert feature_views[0].features[2].dtype == Array(String) + + feature_view = test_registry.get_feature_view("my_feature_view_no_source", project) + any_feature_view = test_registry.get_any_feature_view( + "my_feature_view_no_source", project + ) + + assert feature_view.name == "my_feature_view_no_source" + assert feature_view.batch_source is None + assert feature_view.stream_source is None + assert feature_view.ttl == timedelta(minutes=5) + assert feature_view == any_feature_view + + # After the first apply, created_timestamp should equal last_updated_timestamp. + assert feature_view.created_timestamp == feature_view.last_updated_timestamp + + # Update the feature view and verify created_timestamp is preserved. + fv1.ttl = timedelta(minutes=10) + test_registry.apply_feature_view(fv1, project) + feature_views = test_registry.list_feature_views(project) + assert len(feature_views) == 1 + updated_feature_view = test_registry.get_feature_view( + "my_feature_view_no_source", project + ) + assert updated_feature_view.ttl == timedelta(minutes=10) + assert updated_feature_view.batch_source is None + assert updated_feature_view.created_timestamp == feature_view.created_timestamp + + # Delete the feature view. + test_registry.delete_feature_view("my_feature_view_no_source", project) + feature_views = test_registry.list_feature_views(project) + assert len(feature_views) == 0 + + test_registry.teardown() + + @pytest.mark.integration @pytest.mark.parametrize( "test_registry", diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 9030e6e0c69..3427cfdfc4b 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -23,6 +23,18 @@ from feast.utils import _utc_now, make_tzaware +def test_create_feature_view_without_source(): + fv = FeatureView(name="test_no_source", ttl=timedelta(days=1)) + assert fv.batch_source is None + assert fv.stream_source is None + + proto = fv.to_proto() + assert not proto.spec.HasField("batch_source") + + fv_roundtrip = FeatureView.from_proto(proto) + assert fv_roundtrip.batch_source is None + + def test_create_feature_view_with_conflicting_entities(): user1 = Entity(name="user1", join_keys=["user_id"]) user2 = Entity(name="user2", join_keys=["user_id"]) @@ -48,7 +60,7 @@ def test_create_batch_feature_view(): udf=lambda x: x, ) - with pytest.raises(TypeError): + with pytest.raises(ValueError): BatchFeatureView( name="test batch feature view", entities=[], ttl=timedelta(days=30) )