diff --git a/.secrets.baseline b/.secrets.baseline index 510ca5cad38..6aa1a51af90 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1510,7 +1510,7 @@ "filename": "sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py", "hashed_secret": "95433727ea51026e1e0dc8deadaabd4a3baaaaf4", "is_verified": false, - "line_number": 19 + "line_number": 21 } ], "sdk/python/tests/universal/feature_repos/universal/online_store/singlestore.py": [ diff --git a/infra/scripts/compile-templates.py b/infra/scripts/compile-templates.py index e3130ab419a..6c582a90c94 100644 --- a/infra/scripts/compile-templates.py +++ b/infra/scripts/compile-templates.py @@ -28,7 +28,7 @@ def find_repo(path): # Template README.md ############################ roadmap_path = repo_root / "docs" / "roadmap.md" -with open(roadmap_path, "r") as f: +with open(roadmap_path, "r", encoding="utf-8") as f: # skip first lines since it has the title roadmap_contents_lines = f.readlines()[2:] @@ -36,7 +36,7 @@ def find_repo(path): roadmap_contents = "".join(roadmap_contents_lines) template_path = repo_root / "infra" / "templates" / "README.md.jinja2" -with open(template_path) as f: +with open(template_path, encoding="utf-8") as f: template = Template(f.read()) # Compile template @@ -49,5 +49,5 @@ def find_repo(path): ) readme_path = repo_root / "README.md" -with open(readme_path, "w") as f: +with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_md) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index f2dd5687ecb..6ae7fd087f2 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -142,7 +142,6 @@ class VersionedOnlineReadNotSupported(FeastError): def __init__(self, store_name: str, version: int): super().__init__( f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. " - f"Currently only SQLite supports version-qualified feature references. " ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f95bbf10c03..ed4be94a7c4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1335,7 +1335,9 @@ def teardown(self): entities = self.list_entities() - self._get_provider().teardown_infra(self.project, tables, entities) + self._get_provider().teardown_infra( + self.project, tables, entities, registry=self.registry + ) self.registry.teardown() def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/bigtable.py b/sdk/python/feast/infra/online_stores/bigtable.py index 3479f7f289a..4d35faf626a 100644 --- a/sdk/python/feast/infra/online_stores/bigtable.py +++ b/sdk/python/feast/infra/online_stores/bigtable.py @@ -13,6 +13,7 @@ from feast.feature_view import DUMMY_ENTITY_NAME from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -306,6 +307,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): # Because of historical reasons, Feast calls them tables. We use this alias for # readability. diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 0870bc709db..210f2515ac8 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -460,6 +460,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the database. diff --git a/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py b/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py index c80f9e1285c..086e5cbfb6f 100644 --- a/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py +++ b/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py @@ -274,6 +274,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the database. diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 9ae10792f5a..0c577521d9b 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -139,6 +139,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 814058c77e5..3fbaa9a61e5 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -359,6 +359,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the DynamoDB Online Store. diff --git a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py index b78d003ac25..c5a799be248 100644 --- a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py @@ -263,6 +263,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/faiss_online_store.py b/sdk/python/feast/infra/online_stores/faiss_online_store.py index 3e3d92cde6d..5ef801f9976 100644 --- a/sdk/python/feast/infra/online_stores/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/faiss_online_store.py @@ -86,6 +86,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): self._index = None self._in_memory_store.teardown() diff --git a/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py b/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py index 21359b45bca..16a6a2ea9db 100644 --- a/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py +++ b/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py @@ -299,6 +299,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): online_store_config = config.online_store if not isinstance(online_store_config, HazelcastOnlineStoreConfig): diff --git a/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py index dc48d2c4efc..5942874a0b9 100644 --- a/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py @@ -212,6 +212,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the Hbase Online Store. diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index b657bd44d00..0831c2b87ae 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,9 +1,10 @@ import struct from datetime import datetime, timezone -from typing import Any, List +from typing import Any, List, Optional import mmh3 +from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.key_encoding_utils import ( serialize_entity_key, @@ -70,3 +71,21 @@ def _to_naive_utc(ts: datetime) -> datetime: return ts else: return ts.astimezone(tz=timezone.utc).replace(tzinfo=None) + + +def online_store_table_id( + project: str, + table: FeatureView, + enable_versioning: bool = False, + version: Optional[int] = None, +) -> str: + name = table.name + if enable_versioning: + resolved_version = version + if resolved_version is None: + resolved_version = getattr(table.projection, "version_tag", None) + if resolved_version is None: + resolved_version = getattr(table, "current_version_number", None) + if resolved_version is not None and resolved_version > 0: + name = f"{table.name}_v{resolved_version}" + return f"{project}_{name}" diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py index 8faefdbd344..575be6ab8f8 100644 --- a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -294,34 +294,32 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): - """ - Teardown all managed online stores for the given FeatureViews and Entities. + """Teardown all managed online stores for the given FeatureViews and Entities.""" - Args: - config: Feast RepoConfig. - tables: Sequence of FeatureViews to teardown. - entities: Sequence of Entities to teardown. - """ - # Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance - tribes_seen = set() - online_stores_cfg = getattr(config.online_store, "online_stores", []) - tag_name = getattr(config.online_store, "routing_tag", "tribe") + self._initialize_online_stores(config) + tables_by_tribe: Dict[str, List[FeatureView]] = {} for table in tables: - tribe = table.tags.get(tag_name) + tribe = self._get_routing_tag_value(table, config) if not tribe: - continue - # Find all store configs matching this tribe (supporting multiple instances of the same type) - for store_cfg in online_stores_cfg: - store_type = store_cfg.type - # Use id(store_cfg.conf) to distinguish different configs of the same type - key = (tribe, store_type, id(store_cfg.conf)) - if key in tribes_seen: - continue - tribes_seen.add(key) - # Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility) - if tribe.lower() == store_type.split(".")[-1].lower(): - online_store = self._get_online_store(tribe, config) - if online_store: - config = RepoConfig(**self._prepare_repo_conf(config, tribe)) - online_store.teardown(config, tables, entities) + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + tables_by_tribe.setdefault(tribe, []).append(table) + + for tribe, tribe_tables in tables_by_tribe.items(): + online_store = self._get_online_store(tribe, config) + if not online_store: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + tribe_config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.teardown( + tribe_config, + tribe_tables, + entities, + registry=registry, + ) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index ee2534684cc..c9c43259707 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -505,6 +505,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): self.client = self._connect(config) for table in tables: diff --git a/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py b/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py index 3e7a3db84c8..30bd50f44ae 100644 --- a/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py +++ b/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py @@ -260,6 +260,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Drop the backing collection and close the client. diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index 2172f3aa359..ff8a53c363e 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -11,6 +11,7 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel @@ -97,7 +98,7 @@ def online_write_batch( if progress: progress(1) else: - batch_size = config.online_store.bacth_size + batch_size = config.online_store.batch_size if not batch_size or batch_size < 2: raise ValueError("Batch size must be at least 2") insert_values = [] @@ -276,11 +277,11 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: + project = config.project conn = self._get_conn(config) cur = conn.cursor() - project = config.project - for table in tables: _drop_table_and_index(cur, project, table) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 60bbd749cba..f2100d0d110 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -39,6 +39,10 @@ class OnlineStore(ABC): The interface that Feast uses to interact with the storage system that handles online features. """ + @property + def supports_versioned_online_reads(self) -> bool: + return False + @property def async_supported(self) -> SupportedAsyncMethods: return SupportedAsyncMethods() @@ -189,7 +193,7 @@ def get_online_features( ) # Check for versioned reads on unsupported stores - self._check_versioned_read_support(grouped_refs) + self._check_versioned_read_support(grouped_refs, config) _track_read = False try: from feast.metrics import _config as _metrics_config @@ -253,19 +257,25 @@ def get_online_features( ) return OnlineResponse(online_features_response, feature_types=feature_types) - def _check_versioned_read_support(self, grouped_refs): + def _check_versioned_read_support(self, grouped_refs, config: RepoConfig): """Raise an error if versioned reads are attempted on unsupported stores.""" - from feast.infra.online_stores.sqlite import SqliteOnlineStore - - if isinstance(self, SqliteOnlineStore): - return for table, _ in grouped_refs: version_tag = getattr(table.projection, "version_tag", None) - if version_tag is not None: + if version_tag is None: + continue + + # Version-qualified refs (e.g. @v2) are only supported when online versioning is enabled. + if not config.registry.enable_online_feature_view_versioning: raise VersionedOnlineReadNotSupported( self.__class__.__name__, version_tag ) + # Online versioning enabled: allow stores that implement versioned routing. + if self.supports_versioned_online_reads: + continue + + raise VersionedOnlineReadNotSupported(self.__class__.__name__, version_tag) + async def get_online_features_async( self, config: RepoConfig, @@ -310,7 +320,7 @@ async def get_online_features_async( ) # Check for versioned reads on unsupported stores - self._check_versioned_read_support(grouped_refs) + self._check_versioned_read_support(grouped_refs, config) async def query_table(table, requested_features): # Get the correct set of entity values with the correct join keys. @@ -452,6 +462,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Tears down all cloud resources for the specified set of Feast objects. diff --git a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py index f7780726d12..082f6e0de0c 100644 --- a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py +++ b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py @@ -361,6 +361,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 29a6edf30ad..a221fdb7043 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -247,6 +247,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index aeeb540b910..06da85a4070 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -34,6 +34,7 @@ from feast import Entity, FeatureView, RepoConfig, utils from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel @@ -177,6 +178,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ We delete the keys in redis for tables/views being removed. diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 9bead1fcb9d..96b5c16cd09 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -24,6 +24,7 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.helpers import _to_naive_utc from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -617,6 +618,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): pass diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index eb598ec5e7a..dc8e4b2340c 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import logging from collections import defaultdict from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple @@ -11,12 +12,15 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key -from feast.infra.online_stores.helpers import _to_naive_utc +from feast.infra.online_stores.helpers import _to_naive_utc, online_store_table_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +logger = logging.getLogger(__name__) + class SingleStoreOnlineStoreConfig(FeastConfigBaseModel): """ @@ -41,6 +45,10 @@ class SingleStoreOnlineStore(OnlineStore): _conn: Optional[Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + def _init_conn(self, config: RepoConfig) -> Connection: online_store_config = config.online_store assert isinstance(online_store_config, SingleStoreOnlineStoreConfig) @@ -80,7 +88,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = _to_naive_utc(timestamp) if created_ts is not None: @@ -102,7 +110,7 @@ def online_write_batch( current_batch = insert_values[i : i + batch_size] cur.executemany( f""" - INSERT INTO {_table_id(project, table)} + INSERT INTO {_quote_identifier(online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning))} (entity_key, feature_name, value, event_ts, created_ts) values (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE @@ -130,7 +138,7 @@ def online_read( keys.append( serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() ) @@ -138,7 +146,7 @@ def online_read( entity_key_placeholders = ",".join(["%s" for _ in keys]) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + SELECT entity_key, feature_name, value, event_ts FROM {_quote_identifier(online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning))} WHERE entity_key IN ({entity_key_placeholders}) ORDER BY event_ts; """, @@ -151,7 +159,7 @@ def online_read( ) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + SELECT entity_key, feature_name, value, event_ts FROM {_quote_identifier(online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning))} WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders}) ORDER BY event_ts; """, @@ -191,39 +199,116 @@ def update( partial: bool, ) -> None: project = config.project + versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: # We don't create any special state for the entities in this implementation. for table in tables_to_keep: + table_name = online_store_table_id(project, table, versioning) cur.execute( - f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), + f"""CREATE TABLE IF NOT EXISTS {_quote_identifier(table_name)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp NULL DEFAULT NULL, created_ts timestamp NULL DEFAULT NULL, PRIMARY KEY(entity_key, feature_name), - INDEX {_table_id(project, table)}_ek (entity_key))""" + INDEX {_quote_identifier(table_name + "_ek")} (entity_key))""" ) for table in tables_to_delete: - _drop_table_and_index(cur, project, table) + _drop_table_and_index(cur, project, table, versioning) def teardown( self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: project = config.project + versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: for table in tables: - _drop_table_and_index(cur, project, table) + if not versioning: + _drop_table_and_index(cur, project, table, enable_versioning=False) + continue + + versions = [] + if registry is not None: + try: + versions = registry.list_feature_view_versions( + name=table.name, project=project + ) + except Exception as e: + logger.warning( + "Failed to list feature view versions for %s during teardown; will fall back to dropping discovered versioned tables. Error: %s", + table.name, + e, + ) + versions = [] + + if not versions: + _drop_table_and_index(cur, project, table, enable_versioning=False) + _drop_table_and_index(cur, project, table, enable_versioning=True) + _drop_discovered_versioned_tables(cur, project, table) + continue + + for record in versions: + version_number = record.get("version_number") + if version_number is None: + continue + _drop_table_and_index( + cur, + project, + table, + enable_versioning=True, + version=version_number, + ) + + # Always drop the base (unversioned) table as well + _drop_table_and_index(cur, project, table, enable_versioning=False) + _drop_discovered_versioned_tables(cur, project, table) -def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None: - table_name = _table_id(project, table) - cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") - cur.execute(f"DROP TABLE IF EXISTS {table_name}") +def _drop_table_and_index( + cur: Cursor, + project: str, + table: FeatureView, + enable_versioning: bool, + version: Optional[int] = None, +) -> None: + table_name = online_store_table_id(project, table, enable_versioning, version) + table_name_quoted = _quote_identifier(table_name) + index_name_quoted = _quote_identifier(f"{table_name}_ek") + cur.execute(f"DROP INDEX IF EXISTS {index_name_quoted} ON {table_name_quoted};") + cur.execute(f"DROP TABLE IF EXISTS {table_name_quoted}") -def _table_id(project: str, table: FeatureView) -> str: - return f"{project}_{table.name}" +def _quote_identifier(identifier: str) -> str: + escaped = identifier.replace("`", "``") + return f"`{escaped}`" + + +def _drop_discovered_versioned_tables( + cur: Cursor, project: str, table: FeatureView +) -> None: + base_table_name = online_store_table_id(project, table, enable_versioning=False) + escaped_base_table_name = base_table_name.replace("\\", "\\\\") + escaped_base_table_name = escaped_base_table_name.replace("%", "\\%") + escaped_base_table_name = escaped_base_table_name.replace("_", "\\_") + like_pattern = f"{escaped_base_table_name}\\_v%" + try: + cur.execute("SHOW TABLES LIKE %s", (like_pattern,)) + rows = cur.fetchall() or [] + for row in rows: + table_name = row[0] + index_name = f"{table_name}_ek" + cur.execute( + f"DROP INDEX IF EXISTS {_quote_identifier(index_name)} ON {_quote_identifier(table_name)};" + ) + cur.execute(f"DROP TABLE IF EXISTS {_quote_identifier(table_name)}") + except Exception as e: + logger.warning( + "Failed to discover/drop versioned tables for %s during teardown fallback. Error: %s", + table.name, + e, + ) diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py index d2df674ed94..02a3e7ca3c7 100644 --- a/sdk/python/feast/infra/online_stores/snowflake.py +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -10,6 +10,7 @@ from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils.snowflake.snowflake_utils import ( GetSnowflakeConnection, execute_snowflake_statement, @@ -253,6 +254,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index f04995c61bb..d9d734229b0 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -42,8 +42,10 @@ serialize_entity_key, serialize_f32, ) +from feast.infra.online_stores.helpers import online_store_table_id from feast.infra.online_stores.online_store import OnlineStore from feast.infra.online_stores.vector_store import VectorStoreConfig +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto @@ -124,6 +126,10 @@ class SqliteOnlineStore(OnlineStore): _conn: Optional[sqlite3.Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + @staticmethod def _get_db_path(config: RepoConfig) -> str: assert ( @@ -338,6 +344,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): try: os.unlink(self._get_db_path(config)) @@ -716,16 +723,7 @@ def _initialize_conn( def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str: - name = table.name - if enable_versioning: - # Prefer version_tag from the projection (set by version-qualified refs like @v2) - # over current_version_number (the FV's active version in metadata). - version = getattr(table.projection, "version_tag", None) - if version is None: - version = getattr(table, "current_version_number", None) - if version is not None and version > 0: - name = f"{table.name}_v{version}" - return f"{project}_{name}" + return online_store_table_id(project, table, enable_versioning) class SqliteTable(InfraObject): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 20334e53a2e..9fa93295566 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -178,9 +178,12 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: if self.online_store: - self.online_store.teardown(self.repo_config, tables, entities) + self.online_store.teardown( + self.repo_config, tables, entities, registry=registry + ) if self.batch_engine: self.batch_engine.teardown_infra(project, tables, entities) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 9bdf681fb69..c78c3956331 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -109,6 +109,7 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Tears down all cloud resources for the specified set of Feast objects. diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index 7786af6f2f5..8f415039de5 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -102,7 +102,33 @@ def _cleanup_multiprocess_dir(): atexit.register(_cleanup_multiprocess_dir) # Now safe to import prometheus_client — it will detect the env var. -from prometheus_client import Counter, Gauge, Histogram # noqa: E402 +_prometheus_available = True +try: + from prometheus_client import Counter, Gauge, Histogram # noqa: E402 +except Exception: + _prometheus_available = False + + class _NoOpMetric: + def labels(self, **kwargs): + return self + + def inc(self, amount: float = 1): + return None + + def observe(self, amount: float): + return None + + def set(self, value: float): + return None + + def Counter(*args, **kwargs): # type: ignore + return _NoOpMetric() + + def Gauge(*args, **kwargs): # type: ignore + return _NoOpMetric() + + def Histogram(*args, **kwargs): # type: ignore + return _NoOpMetric() # --------------------------------------------------------------------------- @@ -473,6 +499,12 @@ def start_metrics_server( """ global _config + if not _prometheus_available: + logger.warning( + "Prometheus metrics are unavailable because prometheus_client could not be imported." + ) + return + if metrics_config is not None: _config = metrics_config else: diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini index 1ad76b978e4..ec89cdc71bb 100644 --- a/sdk/python/pytest.ini +++ b/sdk/python/pytest.ini @@ -4,7 +4,6 @@ env = IS_TEST=True filterwarnings = error::_pytest.warning_types.PytestConfigWarning - error::_pytest.warning_types.PytestUnhandledCoroutineWarning ignore::DeprecationWarning:pyspark.sql.pandas.*: ignore::DeprecationWarning:pyspark.sql.connect.*: ignore::DeprecationWarning:httpx.*: diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 8302e313a2d..6eca8297b09 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -85,7 +85,7 @@ def pytest_configure(config): - if platform in ["darwin", "windows"]: + if platform in ["darwin"] or platform.startswith("win"): multiprocessing.set_start_method("spawn", force=True) else: multiprocessing.set_start_method("fork") diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 82cfc7fb513..b5cad523419 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -67,6 +67,7 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): pass diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index 45426c63b8d..fda5f87088c 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -31,7 +31,7 @@ def test_registration_and_retrieval_from_custom_s3_endpoint( "It may be better to deduplicate AWS configuration or use sub-processes for isolation" ) - os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" + os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" # pragma: allowlist secret os.environ["AWS_SECRET_ACCESS_KEY"] = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" with construct_test_environment(config) as environment: diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 0c27585139e..b17544cee8d 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -283,6 +283,77 @@ def test_write_to_online_store(environment, universal_data_sources): assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.85, 1e-6) +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["singlestore"]) +def test_singlestore_versioned_online_reads(environment, universal_data_sources): + fs = environment.feature_store + fs.config.registry.enable_online_feature_view_versioning = True + + entities, datasets, data_sources = universal_data_sources + driver_entity = driver() + + # Apply v0 + driver_hourly_stats_v0 = create_driver_hourly_stats_feature_view( + data_sources.driver + ) + fs.apply([driver_hourly_stats_v0, driver_entity]) + + # Write v0 data + df_v0 = pd.DataFrame( + { + "driver_id": [1], + "conv_rate": [0.1], + "acc_rate": [0.2], + "avg_daily_trips": [10], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], + } + ) + fs.write_to_online_store("driver_stats", df_v0) + + # Apply a schema change to create v1 + driver_hourly_stats_v1 = FeatureView( + name="driver_stats", + entities=[driver_entity], + schema=driver_hourly_stats_v0.schema + + [Field(name="new_feature", dtype=Float32)], + source=data_sources.driver, + ttl=driver_hourly_stats_v0.ttl, + tags=TAGS, + ) + fs.apply([driver_hourly_stats_v1, driver_entity]) + + # Write v1 data + df_v1 = pd.DataFrame( + { + "driver_id": [1], + "conv_rate": [0.1], + "acc_rate": [0.2], + "avg_daily_trips": [20], + "new_feature": [1.0], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], + } + ) + fs.write_to_online_store("driver_stats", df_v1) + + # Read v0 and v1 explicitly + df = fs.get_online_features( + features=["driver_stats@v0:avg_daily_trips", "driver_stats@v1:avg_daily_trips"], + entity_rows=[{"driver_id": 1}], + full_feature_names=True, + ).to_df() + + assertpy.assert_that(df["driver_stats@v0__avg_daily_trips"].iloc[0]).is_equal_to(10) + assertpy.assert_that(df["driver_stats@v1__avg_daily_trips"].iloc[0]).is_equal_to(20) + + def _get_online_features_dict_remotely( endpoint: str, features: Union[List[str], FeatureService], diff --git a/sdk/python/tests/unit/infra/online_store/test_online_store_base.py b/sdk/python/tests/unit/infra/online_store/test_online_store_base.py index 11195521deb..3b023c0bd0d 100644 --- a/sdk/python/tests/unit/infra/online_store/test_online_store_base.py +++ b/sdk/python/tests/unit/infra/online_store/test_online_store_base.py @@ -3,6 +3,7 @@ import pytest from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.supported_async_methods import SupportedAsyncMethods @@ -26,7 +27,7 @@ def update( ): pass - def teardown(self, config, tables, entities): + def teardown(self, config, tables, entities, registry: BaseRegistry = None): pass diff --git a/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py b/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py index b9fda20d26a..571ec342f2c 100644 --- a/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py +++ b/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py @@ -1,6 +1,8 @@ import os +import time from typing import Any, Dict +import psycopg from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.postgres import PostgresContainer @@ -55,20 +57,47 @@ def create_online_store(self) -> Dict[str, Any]: self.container.start() log_string_to_wait_for = "database system is ready to accept connections" wait_for_logs( - container=self.container, predicate=log_string_to_wait_for, timeout=10 + container=self.container, predicate=log_string_to_wait_for, timeout=60 ) init_log_string_to_wait_for = "PostgreSQL init process complete" wait_for_logs( - container=self.container, predicate=init_log_string_to_wait_for, timeout=10 + container=self.container, predicate=init_log_string_to_wait_for, timeout=60 ) + + host = "localhost" + port = int(self.container.get_exposed_port(5432)) + + deadline = time.time() + 60 + last_exc: Exception | None = None + while time.time() < deadline: + try: + conn = psycopg.connect( + host=host, + port=port, + user="root", + password="test!@#$%", + dbname="test", + connect_timeout=2, + sslmode="disable", + ) + conn.close() + last_exc = None + break + except psycopg.OperationalError as e: + last_exc = e + time.sleep(1) + + if last_exc is not None: + raise last_exc + return { - "host": "localhost", + "host": host, "type": "postgres", "user": "root", "password": "test!@#$%", "database": "test", "vector_enabled": True, - "port": self.container.get_exposed_port(5432), + "port": port, "sslmode": "disable", }