From 0f9dfb57a9065d674f9473f90e6a2a761783e019 Mon Sep 17 00:00:00 2001 From: tokoko Date: Thu, 22 Aug 2024 09:16:15 +0000 Subject: [PATCH 1/4] add new registry method for working with any fv type Signed-off-by: tokoko --- protos/feast/registry/RegistryServer.proto | 35 ++++++++- sdk/python/feast/cli_utils.py | 5 +- sdk/python/feast/feature_store.py | 62 +++++----------- .../feast/infra/registry/base_registry.py | 38 ++++++++++ .../feast/infra/registry/caching_registry.py | 34 +++++++++ .../infra/registry/proto_registry_utils.py | 39 ++++++++++ sdk/python/feast/infra/registry/registry.py | 21 +++++- sdk/python/feast/infra/registry/remote.py | 49 +++++++++++++ sdk/python/feast/infra/registry/snowflake.py | 72 ++++++++++++++++++- sdk/python/feast/infra/registry/sql.py | 57 ++++++++++++++- sdk/python/feast/registry_server.py | 64 +++++++++++++++++ sdk/python/feast/utils.py | 5 -- .../online_store/test_universal_online.py | 5 +- .../registration/test_universal_registry.py | 37 +++++++--- 14 files changed, 455 insertions(+), 68 deletions(-) diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 928354077b5..2caf1bdec79 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -31,9 +31,13 @@ service RegistryServer{ // FeatureView RPCs rpc ApplyFeatureView (ApplyFeatureViewRequest) returns (google.protobuf.Empty) {} + rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {} + rpc GetAnyFeatureView (GetAnyFeatureViewRequest) returns (GetAnyFeatureViewResponse) {} + rpc ListAllFeatureViews (ListAllFeatureViewsRequest) returns (ListAllFeatureViewsResponse) {} + + // plain FeatureView RPCs rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {} rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {} - rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {} // StreamFeatureView RPCs rpc GetStreamFeatureView (GetStreamFeatureViewRequest) returns (feast.core.StreamFeatureView) {} @@ -201,6 +205,35 @@ message DeleteFeatureViewRequest { bool commit = 3; } +message AnyFeatureView { + oneof any_feature_view { + feast.core.FeatureView feature_view = 1; + feast.core.OnDemandFeatureView on_demand_feature_view = 2; + feast.core.StreamFeatureView stream_feature_view = 3; + } +} + +message GetAnyFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message GetAnyFeatureViewResponse { + AnyFeatureView any_feature_view = 1; +} + +message ListAllFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; + map tags = 3; +} + +message ListAllFeatureViewsResponse { + repeated AnyFeatureView feature_views = 1; +} + + // StreamFeatureView message GetStreamFeatureViewRequest { diff --git a/sdk/python/feast/cli_utils.py b/sdk/python/feast/cli_utils.py index edfdab93e30..531efc7ab5e 100644 --- a/sdk/python/feast/cli_utils.py +++ b/sdk/python/feast/cli_utils.py @@ -175,7 +175,7 @@ def handle_fv_verbose_permissions_command( tags=tags_filter # type: ignore[assignment] ) for fv in feature_views: - if p.match_resource(fv): + if p.match_resource(fv): # type: ignore[arg-type] feature_views_names.add(fv.name) if len(feature_views_names) > 0: Node( @@ -207,8 +207,7 @@ def handle_not_verbose_permissions_command( def fetch_all_feast_objects(store: FeatureStore) -> list[FeastObject]: objects: list[FeastObject] = [] objects.extend(store.list_entities()) - objects.extend(store.list_all_feature_views()) - objects.extend(store.list_batch_feature_views()) + objects.extend(store.list_all_feature_views()) # type: ignore[arg-type] objects.extend(store.list_feature_services()) objects.extend(store.list_data_sources()) objects.extend(store.list_validation_references()) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a03706e56f6..844c88d11cc 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import itertools -import logging import os import warnings from datetime import datetime, timedelta @@ -260,9 +259,26 @@ def list_feature_services( """ return self._registry.list_feature_services(self.project, tags=tags) + def _list_all_feature_views( + self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None + ) -> List[BaseFeatureView]: + feature_views = [] + for fv in self.registry.list_all_feature_views( + self.project, allow_cache=allow_cache, tags=tags + ): + if ( + isinstance(fv, FeatureView) + and fv.entities + and fv.entities[0] == DUMMY_ENTITY_NAME + ): + fv.entities = [] + fv.entity_columns = [] + feature_views.append(fv) + return feature_views + def list_all_feature_views( self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None - ) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]: + ) -> List[BaseFeatureView]: """ Retrieves the list of feature views from the registry. @@ -287,10 +303,6 @@ def list_feature_views( Returns: A list of feature views. """ - logging.warning( - "list_feature_views will make breaking changes. Please use list_batch_feature_views instead. " - "list_feature_views will behave like list_all_feature_views in the future." - ) return utils._list_feature_views( self._registry, self.project, allow_cache, tags=tags ) @@ -310,44 +322,6 @@ def list_batch_feature_views( """ return self._list_batch_feature_views(allow_cache=allow_cache, tags=tags) - def _list_all_feature_views( - self, - allow_cache: bool = False, - tags: Optional[dict[str, str]] = None, - ) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]: - all_feature_views = ( - utils._list_feature_views( - self._registry, self.project, allow_cache, tags=tags - ) - + self._list_stream_feature_views(allow_cache, tags=tags) - + self.list_on_demand_feature_views(allow_cache, tags=tags) - ) - return all_feature_views - - def _list_feature_views( - self, - allow_cache: bool = False, - hide_dummy_entity: bool = True, - tags: Optional[dict[str, str]] = None, - ) -> List[FeatureView]: - logging.warning( - "_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. " - "_list_feature_views will behave like _list_all_feature_views in the future." - ) - feature_views = [] - for fv in self._registry.list_feature_views( - self.project, allow_cache=allow_cache, tags=tags - ): - if ( - hide_dummy_entity - and fv.entities - and fv.entities[0] == DUMMY_ENTITY_NAME - ): - fv.entities = [] - fv.entity_columns = [] - feature_views.append(fv) - return feature_views - def _list_batch_feature_views( self, allow_cache: bool = False, diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 33adb6b7c95..e7bfc4506fa 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -389,6 +389,44 @@ def list_feature_views( """ raise NotImplementedError + @abstractmethod + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + """ + Retrieves a feature view of any type. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + raise NotImplementedError + + @abstractmethod + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + """ + Retrieve a list of feature views of all types from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + tags: Filter by tags + + Returns: + List of feature views + """ + raise NotImplementedError + @abstractmethod def apply_materialization( self, diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 611d67de96d..ff9d4b8947e 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -6,6 +6,7 @@ from threading import Lock from typing import List, Optional +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.feature_service import FeatureService @@ -101,6 +102,39 @@ def list_entities( ) return self._list_entities(project, tags) + @abstractmethod + def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView: + pass + + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_any_feature_view( + self.cached_registry_proto, name, project + ) + return self._get_any_feature_view(name, project) + + @abstractmethod + def _list_all_feature_views( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[BaseFeatureView]: + pass + + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_all_feature_views( + self.cached_registry_proto, project, tags + ) + return self._list_all_feature_views(project, tags) + @abstractmethod def _get_feature_view(self, name: str, project: str) -> FeatureView: pass diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index f67808aab55..31bbbd9a97f 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -3,6 +3,7 @@ from typing import List, Optional from feast import utils +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.errors import ( @@ -99,6 +100,33 @@ def get_feature_service( raise FeatureServiceNotFoundException(name, project=project) +def get_any_feature_view( + registry_proto: RegistryProto, name: str, project: str +) -> BaseFeatureView: + for feature_view_proto in registry_proto.feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return FeatureView.from_proto(feature_view_proto) + + for feature_view_proto in registry_proto.stream_feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return StreamFeatureView.from_proto(feature_view_proto) + + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if ( + on_demand_feature_view.spec.project == project + and on_demand_feature_view.spec.name == name + ): + return OnDemandFeatureView.from_proto(on_demand_feature_view) + + raise FeatureViewNotFoundException(name, project) + + def get_feature_view( registry_proto: RegistryProto, name: str, project: str ) -> FeatureView: @@ -185,6 +213,17 @@ def list_feature_services( return feature_services +@registry_proto_cache_with_tags +def list_all_feature_views( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[BaseFeatureView]: + return ( + list_feature_views(registry_proto, project, tags) + + list_stream_feature_views(registry_proto, project, tags) + + list_on_demand_feature_views(registry_proto, project, tags) + ) + + @registry_proto_cache_with_tags def list_feature_views( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 366f3aacaad..77080fa43b1 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -557,7 +557,26 @@ def apply_materialization( self.commit() return - raise FeatureViewNotFoundException(feature_view.name, project) + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.list_all_feature_views( + registry_proto, project, tags + ) + + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.get_any_feature_view(registry_proto, name, project) def list_feature_views( self, diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 618628bc071..964c233e8da 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -32,6 +32,24 @@ from feast.stream_feature_view import StreamFeatureView +def extract_base_feature_view( + any_feature_view: RegistryServer_pb2.AnyFeatureView, +) -> BaseFeatureView: + feature_view_type = any_feature_view.WhichOneof("any_feature_view") + if feature_view_type == "feature_view": + feature_view = FeatureView.from_proto(any_feature_view.feature_view) + elif feature_view_type == "on_demand_feature_view": + feature_view = OnDemandFeatureView.from_proto( + any_feature_view.on_demand_feature_view + ) + elif feature_view_type == "stream_feature_view": + feature_view = StreamFeatureView.from_proto( + any_feature_view.stream_feature_view + ) + + return feature_view + + class RemoteRegistryConfig(RegistryConfig): registry_type: StrictStr = "remote" """ str: Provider name or a class name that implements Registry.""" @@ -242,6 +260,37 @@ def list_on_demand_feature_views( for on_demand_feature_view in response.on_demand_feature_views ] + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + request = RegistryServer_pb2.GetAnyFeatureViewRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response: RegistryServer_pb2.GetAnyFeatureViewResponse = ( + self.stub.GetAnyFeatureView(request) + ) + any_feature_view = response.any_feature_view + return extract_base_feature_view(any_feature_view) + + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + request = RegistryServer_pb2.ListAllFeatureViewsRequest( + project=project, allow_cache=allow_cache, tags=tags + ) + + response: RegistryServer_pb2.ListAllFeatureViewsResponse = ( + self.stub.ListAllFeatureViews(request) + ) + return [ + extract_base_feature_view(any_feature_view) + for any_feature_view in response.feature_views + ] + def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 801b90afe38..89d771e9cc1 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta, timezone from enum import Enum from threading import Lock -from typing import Any, Callable, List, Literal, Optional, Set, Union +from typing import Any, Callable, List, Literal, Optional, Set, Union, cast from pydantic import ConfigDict, Field, StrictStr @@ -519,6 +519,76 @@ def get_feature_view( FeatureViewNotFoundException, ) + def get_any_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> BaseFeatureView: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_any_feature_view( + self.cached_registry_proto, name, project + ) + fv = self._get_object( + "FEATURE_VIEWS", + name, + project, + FeatureViewProto, + FeatureView, + "FEATURE_VIEW_NAME", + "FEATURE_VIEW_PROTO", + None, + ) + + if not fv: + fv = self._get_object( + "STREAM_FEATURE_VIEWS", + name, + project, + StreamFeatureViewProto, + StreamFeatureView, + "STREAM_FEATURE_VIEW_NAME", + "STREAM_FEATURE_VIEW_PROTO", + None, + ) + if not fv: + fv = self._get_object( + "ON_DEMAND_FEATURE_VIEWS", + name, + project, + OnDemandFeatureViewProto, + OnDemandFeatureView, + "ON_DEMAND_FEATURE_VIEW_NAME", + "ON_DEMAND_FEATURE_VIEW_PROTO", + FeatureViewNotFoundException, + ) + return fv + + def list_all_feature_views( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[BaseFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_all_feature_views( + self.cached_registry_proto, project, tags + ) + + return ( + cast( + list[BaseFeatureView], + self.list_feature_views(project, allow_cache, tags), + ) + + cast( + list[BaseFeatureView], + self.list_stream_feature_views(project, allow_cache, tags), + ) + + cast( + list[BaseFeatureView], + self.list_on_demand_feature_views(project, allow_cache, tags), + ) + ) + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: infra_object = self._get_object( "MANAGED_INFRA", diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 90c6e82e7d9..474249dc4cd 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -3,7 +3,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Set, Union +from typing import Any, Callable, Dict, List, Optional, Set, Union, cast from pydantic import StrictStr from sqlalchemy import ( # type: ignore @@ -270,6 +270,61 @@ def _get_entity(self, name: str, project: str) -> Entity: not_found_exception=EntityNotFoundException, ) + def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView: + fv = self._get_object( + table=feature_views, + name=name, + project=project, + proto_class=FeatureViewProto, + python_class=FeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=None, + ) + + if not fv: + fv = self._get_object( + table=on_demand_feature_views, + name=name, + project=project, + proto_class=OnDemandFeatureViewProto, + python_class=OnDemandFeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=None, + ) + + if not fv: + fv = self._get_object( + table=stream_feature_views, + name=name, + project=project, + proto_class=StreamFeatureViewProto, + python_class=StreamFeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=FeatureViewNotFoundException, + ) + return fv + + def _list_all_feature_views( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[BaseFeatureView]: + return ( + cast( + list[BaseFeatureView], + self._list_feature_views(project=project, tags=tags), + ) + + cast( + list[BaseFeatureView], + self._list_stream_feature_views(project=project, tags=tags), + ) + + cast( + list[BaseFeatureView], + self._list_on_demand_feature_views(project=project, tags=tags), + ) + ) + def _get_feature_view(self, name: str, project: str) -> FeatureView: return self._get_object( table=feature_views, diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 6b37aba08d6..e4cb6ae8530 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -8,6 +8,7 @@ from grpc_reflection.v1alpha import reflection from feast import FeatureService, FeatureStore +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.errors import FeatureViewNotFoundException @@ -31,6 +32,28 @@ from feast.stream_feature_view import StreamFeatureView +def _build_any_feature_view_proto(feature_view: BaseFeatureView): + if isinstance(feature_view, StreamFeatureView): + arg_name = "stream_feature_view" + feature_view_proto = feature_view.to_proto() + elif isinstance(feature_view, FeatureView): + arg_name = "feature_view" + feature_view_proto = feature_view.to_proto() + elif isinstance(feature_view, OnDemandFeatureView): + arg_name = "on_demand_feature_view" + feature_view_proto = feature_view.to_proto() + + return RegistryServer_pb2.AnyFeatureView( + feature_view=feature_view_proto if arg_name == "feature_view" else None, + stream_feature_view=feature_view_proto + if arg_name == "stream_feature_view" + else None, + on_demand_feature_view=feature_view_proto + if arg_name == "on_demand_feature_view" + else None, + ) + + class RegistryServer(RegistryServer_pb2_grpc.RegistryServerServicer): def __init__(self, registry: BaseRegistry) -> None: super().__init__() @@ -169,6 +192,27 @@ def GetFeatureView( actions=[AuthzedAction.DESCRIBE], ).to_proto() + def GetAnyFeatureView( + self, request: RegistryServer_pb2.GetAnyFeatureViewRequest, context + ): + feature_view = assert_permissions( + cast( + FeastObject, + self.proxied_registry.get_any_feature_view( + name=request.name, + project=request.project, + allow_cache=request.allow_cache, + ), + ), + actions=[AuthzedAction.DESCRIBE], + ) + + return RegistryServer_pb2.GetAnyFeatureViewResponse( + any_feature_view=_build_any_feature_view_proto( + cast(BaseFeatureView, feature_view) + ) + ) + def ApplyFeatureView( self, request: RegistryServer_pb2.ApplyFeatureViewRequest, context ): @@ -215,6 +259,26 @@ def ListFeatureViews( ] ) + def ListAllFeatureViews( + self, request: RegistryServer_pb2.ListAllFeatureViewsRequest, context + ): + return RegistryServer_pb2.ListAllFeatureViewsResponse( + feature_views=[ + _build_any_feature_view_proto(cast(BaseFeatureView, feature_view)) + for feature_view in permitted_resources( + resources=cast( + list[FeastObject], + self.proxied_registry.list_all_feature_views( + project=request.project, + allow_cache=request.allow_cache, + tags=dict(request.tags), + ), + ), + actions=AuthzedAction.DESCRIBE, + ) + ] + ) + def DeleteFeatureView( self, request: RegistryServer_pb2.DeleteFeatureViewRequest, context ): diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 5862cd46304..992869557af 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1,6 +1,5 @@ import copy import itertools -import logging import os import typing import warnings @@ -746,10 +745,6 @@ def _list_feature_views( ) -> List["FeatureView"]: from feast.feature_view import DUMMY_ENTITY_NAME - logging.warning( - "_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. " - "_list_feature_views will behave like _list_all_feature_views in the future." - ) feature_views = [] for fv in registry.list_feature_views(project, allow_cache=allow_cache, tags=tags): if hide_dummy_entity and fv.entities and fv.entities[0] == DUMMY_ENTITY_NAME: diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 2ffe869ef50..308201590df 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -163,7 +163,6 @@ def test_write_to_online_store_event_check(environment): fs.apply([fv1, e]) assert len(fs.list_all_feature_views(tags=TAGS)) == 1 assert len(fs.list_feature_views(tags=TAGS)) == 1 - assert len(fs.list_batch_feature_views(tags=TAGS)) == 1 # data to ingest into Online Store (recent) data = { @@ -421,7 +420,7 @@ def setup_feature_store_universal_feature_views( feature_views = construct_universal_feature_views(data_sources) fs.apply([driver(), feature_views.driver, feature_views.global_fv]) - assert len(fs.list_batch_feature_views(TAGS)) == 2 + assert len(fs.list_all_feature_views(TAGS)) == 2 data = { "driver_id": [1, 2], @@ -518,7 +517,7 @@ def test_online_list_retrieval(environment, universal_data_sources): environment, universal_data_sources ) - assert len(fs.list_batch_feature_views(tags=TAGS)) == 2 + assert len(fs.list_all_feature_views(tags=TAGS)) == 2 @pytest.mark.integration diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index c528cee4a84..e3e746711a6 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -36,6 +36,7 @@ from feast.field import Field from feast.infra.infra_object import Infra from feast.infra.online_stores.sqlite import SqliteTable +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.registry.registry import Registry from feast.infra.registry.remote import RemoteRegistry, RemoteRegistryConfig from feast.infra.registry.sql import SqlRegistry @@ -92,7 +93,7 @@ def s3_registry() -> Registry: @pytest.fixture(scope="session") -def minio_registry() -> Registry: +def minio_registry(): bucket_name = "test-bucket" container = MinioContainer() @@ -127,7 +128,7 @@ def minio_registry() -> Registry: logger = logging.getLogger(__name__) -@pytest.fixture(scope="function") +@pytest.fixture(scope="session") def pg_registry(): container = ( DockerContainer("postgres:latest") @@ -146,7 +147,7 @@ def pg_registry(): container.stop() -@pytest.fixture(scope="function") +@pytest.fixture(scope="session") def pg_registry_async(): container = ( DockerContainer("postgres:latest") @@ -190,7 +191,7 @@ def _given_registry_config_for_pg_sql( ) -@pytest.fixture(scope="function") +@pytest.fixture(scope="session") def mysql_registry(): container = MySqlContainer("mysql:latest") container.start() @@ -202,7 +203,7 @@ def mysql_registry(): container.stop() -@pytest.fixture(scope="function") +@pytest.fixture(scope="session") def mysql_registry_async(): container = MySqlContainer("mysql:latest") container.start() @@ -407,7 +408,7 @@ def assert_project_uuid(project, project_uuid, test_registry): "test_registry", all_fixtures, ) -def test_apply_feature_view_success(test_registry): +def test_apply_feature_view_success(test_registry: BaseRegistry): # Create Feature Views batch_source = FileSource( file_format=ParquetFormat(), @@ -456,6 +457,8 @@ def test_apply_feature_view_success(test_registry): ) feature_view = test_registry.get_feature_view("my_feature_view_1", project) + any_feature_view = test_registry.get_any_feature_view("my_feature_view_1", project) + assert ( feature_view.name == "my_feature_view_1" and feature_view.features[0].name == "fs1_my_feature_1" @@ -467,6 +470,7 @@ def test_apply_feature_view_success(test_registry): and feature_view.features[3].name == "fs1_my_feature_4" and feature_view.features[3].dtype == Array(Bytes) and feature_view.entities[0] == "fs1_my_entity_1" + and feature_view == any_feature_view ) assert feature_view.ttl == timedelta(minutes=5) @@ -494,7 +498,7 @@ def test_apply_feature_view_success(test_registry): "test_registry", sql_fixtures, ) -def test_apply_on_demand_feature_view_success(test_registry): +def test_apply_on_demand_feature_view_success(test_registry: BaseRegistry): # Create Feature Views driver_stats = FileSource( name="driver_stats_source", @@ -537,6 +541,7 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: test_registry.get_user_metadata(project, location_features_from_push) # Register Feature View + test_registry.apply_feature_view(driver_daily_features_view, project) test_registry.apply_feature_view(location_features_from_push, project) assert not test_registry.get_user_metadata(project, location_features_from_push) @@ -555,13 +560,21 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: and feature_views[0].features[0].dtype == String ) + all_feature_views = test_registry.list_all_feature_views(project) + + assert len(all_feature_views) == 2 + feature_view = test_registry.get_on_demand_feature_view( "location_features_from_push", project ) + any_feature_view = test_registry.get_any_feature_view( + "location_features_from_push", project + ) assert ( feature_view.name == "location_features_from_push" and feature_view.features[0].name == "first_char" and feature_view.features[0].dtype == String + and feature_view == any_feature_view ) test_registry.delete_feature_view("location_features_from_push", project) @@ -1101,7 +1114,7 @@ def test_registry_cache_thread_async(test_registry): "test_registry", all_fixtures, ) -def test_apply_stream_feature_view_success(test_registry): +def test_apply_stream_feature_view_success(test_registry: BaseRegistry): # Create Feature Views def simple_udf(x: int): return x + 3 @@ -1145,7 +1158,7 @@ def simple_udf(x: int): tags={"team": "matchmaking"}, ) - project = "project" + project = "test_apply_stream_feature_view_success" # Register Stream Feature View test_registry.apply_feature_view(sfv, project) @@ -1154,8 +1167,14 @@ def simple_udf(x: int): project, tags=sfv.tags ) + all_feature_views = test_registry.list_all_feature_views(project, tags=sfv.tags) + + print(stream_feature_views) + print(all_feature_views) + # List Feature Views assert len(stream_feature_views) == 1 + assert len(all_feature_views) == 1 assert stream_feature_views[0] == sfv test_registry.delete_feature_view("test kafka stream feature view", project) From 9b84ce7a5a3423448db9707b187b6a9ad9962185 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 24 Aug 2024 09:41:06 +0000 Subject: [PATCH 2/4] fix: different project for each test in test_universal_registry Signed-off-by: tokoko --- .../registration/test_universal_registry.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index e3e746711a6..7e983b40f95 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -339,7 +339,7 @@ def test_apply_entity_success(test_registry): tags={"team": "matchmaking"}, ) - project = "project" + project = "test_apply_entity_success" # Register Entity test_registry.apply_entity(entity, project) @@ -434,7 +434,7 @@ def test_apply_feature_view_success(test_registry: BaseRegistry): ttl=timedelta(minutes=5), ) - project = "project" + project = "test_apply_feature_view_success" # Register Feature View test_registry.apply_feature_view(fv1, project) @@ -535,7 +535,7 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: df["first_char"] = inputs["string_feature"].str[:1].astype("string") return df - project = "project" + project = "test_apply_on_demand_feature_view_success" with pytest.raises(FeatureViewNotFoundException): test_registry.get_user_metadata(project, location_features_from_push) @@ -616,7 +616,7 @@ def test_apply_data_source(test_registry): ttl=timedelta(minutes=5), ) - project = "project" + project = "test_apply_data_source" # Register data source and feature view test_registry.apply_data_source(batch_source, project, commit=False) @@ -735,7 +735,7 @@ def simple_udf(x: int): tags={}, ) - project = "project" + project = "test_modify_feature_views_success" # Register Feature Views test_registry.apply_feature_view(odfv1, project) @@ -982,7 +982,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) def test_update_infra(test_registry): # Create infra object - project = "project" + project = "test_update_infra" infra = test_registry.get_infra(project=project) assert len(infra.infra_objects) == 0 @@ -1039,7 +1039,7 @@ def test_registry_cache(test_registry): ttl=timedelta(minutes=5), ) - project = "project" + project = "test_registry_cache" # Register data source and feature view test_registry.apply_data_source(batch_source, project) @@ -1086,7 +1086,7 @@ def test_registry_cache_thread_async(test_registry): created_timestamp_column="timestamp", ) - project = "project" + project = "test_registry_cache_thread_async" # Register data source test_registry.apply_data_source(batch_source, project) @@ -1206,7 +1206,7 @@ def test_apply_feature_service_success(test_registry): fs = FeatureService( name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] ) - project = "project" + project = "test_apply_feature_service_success" # Register Feature Service test_registry.apply_feature_service(fs, project) @@ -1245,7 +1245,7 @@ def test_modify_feature_service_success(test_registry): fs = FeatureService( name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] ) - project = "project" + project = "test_modify_feature_service_success" # Register Feature service test_registry.apply_feature_service(fs, project) @@ -1292,7 +1292,7 @@ def test_commit(): tags={"team": "matchmaking"}, ) - project = "project" + project = "test_commit" # Register Entity without commiting test_registry.apply_entity(entity, project, commit=False) @@ -1381,7 +1381,7 @@ def test_apply_permission_success(test_registry): types=FeatureView, ) - project = "project" + project = "test_apply_permission_success" # Register Permission test_registry.apply_permission(permission, project) From b365afc4ef2c9a679f0d9574fb3a1674a913b635 Mon Sep 17 00:00:00 2001 From: tokoko Date: Fri, 6 Sep 2024 10:03:09 +0000 Subject: [PATCH 3/4] revert project names to project in test_universal_registry Signed-off-by: tokoko --- .../registration/test_universal_registry.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index b36a0f83318..0a256760926 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -358,7 +358,7 @@ def test_apply_entity_success(test_registry): tags={"team": "matchmaking"}, ) - project = "test_apply_entity_success" + project = "project" # Register Entity test_registry.apply_entity(entity, project) @@ -464,7 +464,7 @@ def test_apply_feature_view_success(test_registry: BaseRegistry): ttl=timedelta(minutes=5), ) - project = "test_apply_feature_view_success" + project = "project" # Register Feature View test_registry.apply_feature_view(fv1, project) @@ -565,7 +565,7 @@ def location_features_from_push(inputs: pd.DataFrame) -> pd.DataFrame: df["first_char"] = inputs["string_feature"].str[:1].astype("string") return df - project = "test_apply_on_demand_feature_view_success" + project = "project" with pytest.raises(FeatureViewNotFoundException): test_registry.get_user_metadata(project, location_features_from_push) @@ -646,7 +646,7 @@ def test_apply_data_source(test_registry): ttl=timedelta(minutes=5), ) - project = "test_apply_data_source" + project = "project" # Register data source and feature view test_registry.apply_data_source(batch_source, project, commit=False) @@ -765,7 +765,7 @@ def simple_udf(x: int): tags={}, ) - project = "test_modify_feature_views_success" + project = "project" # Register Feature Views test_registry.apply_feature_view(odfv1, project, False) @@ -1013,7 +1013,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) def test_update_infra(test_registry): # Create infra object - project = "test_update_infra" + project = "project" infra = test_registry.get_infra(project=project) assert len(infra.infra_objects) == 0 @@ -1070,7 +1070,7 @@ def test_registry_cache(test_registry): ttl=timedelta(minutes=5), ) - project = "test_registry_cache" + project = "project" # Register data source and feature view test_registry.apply_data_source(batch_source, project) @@ -1117,7 +1117,7 @@ def test_registry_cache_thread_async(test_registry): created_timestamp_column="timestamp", ) - project = "test_registry_cache_thread_async" + project = "project" # Register data source test_registry.apply_data_source(batch_source, project) @@ -1189,7 +1189,7 @@ def simple_udf(x: int): tags={"team": "matchmaking"}, ) - project = "test_apply_stream_feature_view_success" + project = "project" # Register Stream Feature View test_registry.apply_feature_view(sfv, project) @@ -1237,7 +1237,7 @@ def test_apply_feature_service_success(test_registry): fs = FeatureService( name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] ) - project = "test_apply_feature_service_success" + project = "project" # Register Feature Service test_registry.apply_feature_service(fs, project) @@ -1276,7 +1276,7 @@ def test_modify_feature_service_success(test_registry): fs = FeatureService( name="my_feature_service_1", features=[feature_view[["feature1", "feature2"]]] ) - project = "test_modify_feature_service_success" + project = "project" # Register Feature service test_registry.apply_feature_service(fs, project) @@ -1323,7 +1323,7 @@ def test_commit(): tags={"team": "matchmaking"}, ) - project = "test_commit" + project = "project" # Register Entity without commiting test_registry.apply_entity(entity, project, commit=False) @@ -1420,7 +1420,7 @@ def test_apply_permission_success(test_registry): types=FeatureView, ) - project = "test_apply_permission_success" + project = "project" # Register Permission test_registry.apply_permission(permission, project) From 48cf968524c52674d1706062fb5cc375fb78b940 Mon Sep 17 00:00:00 2001 From: tokoko Date: Fri, 6 Sep 2024 10:08:35 +0000 Subject: [PATCH 4/4] remove print statements from test_universal_registry Signed-off-by: tokoko --- .../tests/integration/registration/test_universal_registry.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index 0a256760926..ab5c011f6f0 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -1200,9 +1200,6 @@ def simple_udf(x: int): all_feature_views = test_registry.list_all_feature_views(project, tags=sfv.tags) - print(stream_feature_views) - print(all_feature_views) - # List Feature Views assert len(stream_feature_views) == 1 assert len(all_feature_views) == 1