diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 08d40827435..515a6c39b11 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError): def __init__(self, store_name: str, version: int): super().__init__( f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. " - f"Currently only SQLite, PostgreSQL, MySQL, and FAISS support version-qualified feature references. " + f"Currently only SQLite, PostgreSQL, MySQL, FAISS, Redis, and DynamoDB support version-qualified feature references. " ) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 814058c77e5..1998167e4b0 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -24,7 +24,7 @@ from pydantic import StrictBool, StrictStr from feast import Entity, FeatureView, utils -from feast.infra.online_stores.helpers import compute_entity_id +from feast.infra.online_stores.helpers import compute_entity_id, compute_versioned_name from feast.infra.online_stores.online_store import OnlineStore from feast.infra.supported_async_methods import SupportedAsyncMethods from feast.infra.utils.aws_utils import dynamo_write_items_async @@ -1154,8 +1154,11 @@ def _initialize_dynamodb_resource( def _get_table_name( online_config: DynamoDBOnlineStoreConfig, config: RepoConfig, table: FeatureView ) -> str: + table_name = compute_versioned_name( + table, config.registry.enable_online_feature_view_versioning + ) return online_config.table_name_template.format( - project=config.project, table_name=table.name + project=config.project, table_name=table_name ) diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 40e0f50a62c..59ef9185c1f 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -72,8 +72,8 @@ def _to_naive_utc(ts: datetime) -> datetime: return ts.astimezone(tz=timezone.utc).replace(tzinfo=None) -def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str: - """Build the online-store table name, appending a version suffix when versioning is enabled.""" +def compute_versioned_name(table: Any, enable_versioning: bool = False) -> str: + """Return the table name with a ``_v{N}`` suffix when versioning is enabled.""" name = table.name if enable_versioning: version = getattr(table.projection, "version_tag", None) @@ -81,4 +81,9 @@ def compute_table_id(project: str, table: Any, enable_versioning: bool = False) version = getattr(table, "current_version_number", None) if version is not None and version > 0: name = f"{table.name}_v{version}" - return f"{project}_{name}" + return name + + +def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str: + """Build the online-store table name, appending a version suffix when versioning is enabled.""" + return f"{project}_{compute_versioned_name(table, enable_versioning)}" diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index cc77abf39ba..c3fda86cc5e 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -280,6 +280,18 @@ def _check_versioned_read_support(self, grouped_refs): supported_types.append(FaissOnlineStore) except ImportError: pass + try: + from feast.infra.online_stores.redis import RedisOnlineStore + + supported_types.append(RedisOnlineStore) + except Exception: + pass + try: + from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore + + supported_types.append(DynamoDBOnlineStore) + except Exception: + pass if isinstance(self, tuple(supported_types)): return diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index aeeb540b910..1868a32792d 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -32,7 +32,12 @@ from pydantic import StrictStr from feast import Entity, FeatureView, RepoConfig, utils -from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix +from feast.infra.online_stores.helpers import ( + _mmh3, + _redis_key, + _redis_key_prefix, + compute_versioned_name, +) from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -51,6 +56,13 @@ logger = logging.getLogger(__name__) +def _versioned_fv_name(table: FeatureView, config: RepoConfig) -> str: + """Return the feature view name with version suffix when versioning is enabled.""" + return compute_versioned_name( + table, config.registry.enable_online_feature_view_versioning + ) + + class RedisType(str, Enum): redis = "redis" redis_cluster = "redis_cluster" @@ -123,8 +135,9 @@ def delete_table(self, config: RepoConfig, table: FeatureView): deleted_count = 0 prefix = _redis_key_prefix(table.join_keys) - redis_hash_keys = [_mmh3(f"{table.name}:{f.name}") for f in table.features] - redis_hash_keys.append(bytes(f"_ts:{table.name}", "utf8")) + fv_name = _versioned_fv_name(table, config) + redis_hash_keys = [_mmh3(f"{fv_name}:{f.name}") for f in table.features] + redis_hash_keys.append(bytes(f"_ts:{fv_name}", "utf8")) with client.pipeline(transaction=False) as pipe: for _k in client.scan_iter( @@ -133,7 +146,7 @@ def delete_table(self, config: RepoConfig, table: FeatureView): _tables = { _hk[4:] for _hk in client.hgetall(_k) if _hk.startswith(b"_ts:") } - if bytes(table.name, "utf8") not in _tables: + if bytes(fv_name, "utf8") not in _tables: continue if len(_tables) == 1: pipe.delete(_k) @@ -142,7 +155,7 @@ def delete_table(self, config: RepoConfig, table: FeatureView): deleted_count += 1 pipe.execute() - logger.debug(f"Deleted {deleted_count} rows for feature view {table.name}") + logger.debug(f"Deleted {deleted_count} rows for feature view {fv_name}") def update( self, @@ -281,7 +294,7 @@ def online_write_batch( client = self._get_client(online_store_config) project = config.project - feature_view = table.name + feature_view = _versioned_fv_name(table, config) ts_key = f"_ts:{feature_view}" keys = [] # redis pipelining optimization: send multiple commands to redis server without waiting for every reply @@ -355,13 +368,15 @@ def _generate_hset_keys_for_features( self, feature_view: FeatureView, requested_features: Optional[List[str]] = None, + fv_name_override: Optional[str] = None, ) -> Tuple[List[str], List[str]]: if not requested_features: requested_features = [f.name for f in feature_view.features] - hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features] + fv_name = fv_name_override or feature_view.name + hset_keys = [_mmh3(f"{fv_name}:{k}") for k in requested_features] - ts_key = f"_ts:{feature_view.name}" + ts_key = f"_ts:{fv_name}" hset_keys.append(ts_key) requested_features.append(ts_key) @@ -390,9 +405,10 @@ def online_read( client = self._get_client(online_store_config) feature_view = table + fv_name = _versioned_fv_name(table, config) requested_features, hset_keys = self._generate_hset_keys_for_features( - feature_view, requested_features + feature_view, requested_features, fv_name_override=fv_name ) keys = self._generate_redis_keys_for_entities(config, entity_keys) @@ -403,7 +419,7 @@ def online_read( redis_values = pipe.execute() return self._convert_redis_values_to_protobuf( - redis_values, feature_view.name, requested_features + redis_values, fv_name, requested_features ) async def online_read_async( @@ -418,9 +434,10 @@ async def online_read_async( client = await self._get_client_async(online_store_config) feature_view = table + fv_name = _versioned_fv_name(table, config) requested_features, hset_keys = self._generate_hset_keys_for_features( - feature_view, requested_features + feature_view, requested_features, fv_name_override=fv_name ) keys = self._generate_redis_keys_for_entities(config, entity_keys) @@ -430,7 +447,7 @@ async def online_read_async( redis_values = await pipe.execute() return self._convert_redis_values_to_protobuf( - redis_values, feature_view.name, requested_features + redis_values, fv_name, requested_features ) def _get_features_for_entity( diff --git a/sdk/python/tests/integration/online_store/test_dynamodb_versioning.py b/sdk/python/tests/integration/online_store/test_dynamodb_versioning.py new file mode 100644 index 00000000000..2575e1dbb19 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_dynamodb_versioning.py @@ -0,0 +1,194 @@ +"""Integration tests for DynamoDB online store feature view versioning. + +Run with: pytest --integration sdk/python/tests/integration/online_store/test_dynamodb_versioning.py + +Uses moto to mock the DynamoDB service (no Docker required). +""" + +import os +from datetime import datetime, timedelta, timezone + +import pytest + +from feast import Entity, FeatureView +from feast.field import Field +from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RegistryConfig, RepoConfig +from feast.types import Float32, Int64 +from feast.value_type import ValueType + + +def _make_feature_view(name="driver_stats", version="latest"): + entity = Entity( + name="driver_id", + join_keys=["driver_id"], + value_type=ValueType.INT64, + ) + return FeatureView( + name=name, + entities=[entity], + ttl=timedelta(days=1), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="trips_today", dtype=Int64), + Field(name="avg_rating", dtype=Float32), + ], + version=version, + ) + + +def _make_entity_key(driver_id: int) -> EntityKeyProto: + entity_key = EntityKeyProto() + entity_key.join_keys.append("driver_id") + val = ValueProto() + val.int64_val = driver_id + entity_key.entity_values.append(val) + return entity_key + + +def _write_and_read(store, config, fv, driver_id=1001, trips=42): + entity_key = _make_entity_key(driver_id) + val = ValueProto() + val.int64_val = trips + now = datetime.now(tz=timezone.utc) + store.online_write_batch( + config, fv, [(entity_key, {"trips_today": val}, now, now)], None + ) + return store.online_read(config, fv, [entity_key], ["trips_today"]) + + +def _make_config(enable_versioning=False): + from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig + + return RepoConfig( + project="test_project", + provider="local", + online_store=DynamoDBOnlineStoreConfig( + type="dynamodb", + region="us-east-1", + ), + registry=RegistryConfig( + path="/tmp/test_dynamodb_registry.pb", + enable_online_feature_view_versioning=enable_versioning, + ), + entity_key_serialization_version=3, + ) + + +@pytest.mark.integration +class TestDynamoDBVersioningIntegration: + """Integration tests for DynamoDB versioning using moto mock.""" + + @pytest.fixture(autouse=True) + def setup_dynamodb(self): + try: + from moto import mock_dynamodb + except ImportError: + pytest.skip("moto not installed") + + # Set dummy AWS credentials for moto + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" # noqa: S105 # pragma: allowlist secret + os.environ["AWS_SECURITY_TOKEN"] = "testing" # noqa: S105 # pragma: allowlist secret + os.environ["AWS_SESSION_TOKEN"] = "testing" # noqa: S105 # pragma: allowlist secret + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + + with mock_dynamodb(): + yield + + def test_write_read_without_versioning(self): + config = _make_config(enable_versioning=False) + store = DynamoDBOnlineStore() + fv = _make_feature_view() + store.update(config, [], [fv], [], [], False) + + result = _write_and_read(store, config, fv) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 42 + + def test_write_read_with_versioning_v1(self): + config = _make_config(enable_versioning=True) + store = DynamoDBOnlineStore() + fv = _make_feature_view() + fv.current_version_number = 1 + store.update(config, [], [fv], [], [], False) + + result = _write_and_read(store, config, fv) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 42 + + def test_version_isolation(self): + """Data written to v1 is not visible from v2.""" + config = _make_config(enable_versioning=True) + store = DynamoDBOnlineStore() + + fv_v1 = _make_feature_view() + fv_v1.current_version_number = 1 + store.update(config, [], [fv_v1], [], [], False) + _write_and_read(store, config, fv_v1, driver_id=1001, trips=10) + + fv_v2 = _make_feature_view() + fv_v2.current_version_number = 2 + store.update(config, [], [fv_v2], [], [], False) + + entity_key = _make_entity_key(1001) + result = store.online_read(config, fv_v2, [entity_key], ["trips_today"]) + assert result[0] == (None, None) + + result = store.online_read(config, fv_v1, [entity_key], ["trips_today"]) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 10 + + def test_projection_version_tag_routes_to_correct_table(self): + """projection.version_tag routes reads to the correct versioned DynamoDB table.""" + config = _make_config(enable_versioning=True) + store = DynamoDBOnlineStore() + + fv_v1 = _make_feature_view() + fv_v1.current_version_number = 1 + store.update(config, [], [fv_v1], [], [], False) + _write_and_read(store, config, fv_v1, driver_id=1001, trips=100) + + fv_v2 = _make_feature_view() + fv_v2.current_version_number = 2 + store.update(config, [], [fv_v2], [], [], False) + _write_and_read(store, config, fv_v2, driver_id=1001, trips=200) + + fv_read = _make_feature_view() + fv_read.projection.version_tag = 1 + entity_key = _make_entity_key(1001) + result = store.online_read(config, fv_read, [entity_key], ["trips_today"]) + assert result[0][1]["trips_today"].int64_val == 100 + + fv_read2 = _make_feature_view() + fv_read2.projection.version_tag = 2 + result = store.online_read(config, fv_read2, [entity_key], ["trips_today"]) + assert result[0][1]["trips_today"].int64_val == 200 + + def test_teardown_versioned_table(self): + """teardown() drops the versioned DynamoDB table without error.""" + config = _make_config(enable_versioning=True) + store = DynamoDBOnlineStore() + + fv = _make_feature_view() + fv.current_version_number = 1 + store.update(config, [], [fv], [], [], False) + _write_and_read(store, config, fv) + + # Should not raise + store.teardown(config, [fv], []) + + def test_update_deletes_versioned_table(self): + """update() with tables_to_delete correctly drops versioned DynamoDB tables.""" + config = _make_config(enable_versioning=True) + store = DynamoDBOnlineStore() + + fv = _make_feature_view() + fv.current_version_number = 1 + store.update(config, [], [fv], [], [], False) + _write_and_read(store, config, fv, driver_id=1001, trips=50) + + # Delete the versioned table + store.update(config, [fv], [], [], [], False) diff --git a/sdk/python/tests/integration/online_store/test_redis_versioning.py b/sdk/python/tests/integration/online_store/test_redis_versioning.py new file mode 100644 index 00000000000..50ebe36b106 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_redis_versioning.py @@ -0,0 +1,207 @@ +"""Integration tests for Redis online store feature view versioning. + +Run with: pytest --integration sdk/python/tests/integration/online_store/test_redis_versioning.py +""" + +import shutil +from datetime import datetime, timedelta, timezone + +import pytest + +from feast import Entity, FeatureView +from feast.field import Field +from feast.infra.online_stores.redis import RedisOnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RegistryConfig, RepoConfig +from feast.types import Float32, Int64 +from feast.value_type import ValueType + + +def _make_feature_view(name="driver_stats", version="latest"): + entity = Entity( + name="driver_id", + join_keys=["driver_id"], + value_type=ValueType.INT64, + ) + return FeatureView( + name=name, + entities=[entity], + ttl=timedelta(days=1), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="trips_today", dtype=Int64), + Field(name="avg_rating", dtype=Float32), + ], + version=version, + ) + + +def _make_entity_key(driver_id: int) -> EntityKeyProto: + entity_key = EntityKeyProto() + entity_key.join_keys.append("driver_id") + val = ValueProto() + val.int64_val = driver_id + entity_key.entity_values.append(val) + return entity_key + + +def _write_and_read(store, config, fv, driver_id=1001, trips=42): + entity_key = _make_entity_key(driver_id) + val = ValueProto() + val.int64_val = trips + now = datetime.now(tz=timezone.utc) + store.online_write_batch( + config, fv, [(entity_key, {"trips_today": val}, now, now)], None + ) + return store.online_read(config, fv, [entity_key], ["trips_today"]) + + +@pytest.mark.integration +@pytest.mark.skipif( + not shutil.which("docker"), + reason="Docker not available", +) +class TestRedisVersioningIntegration: + """Integration tests for Redis versioning with a real Redis instance.""" + + @pytest.fixture(autouse=True) + def setup_redis(self): + try: + from testcontainers.redis import RedisContainer + except ImportError: + pytest.skip("testcontainers[redis] not installed") + + self.container = RedisContainer("redis:7").with_exposed_ports(6379) + self.container.start() + self.port = self.container.get_exposed_port(6379) + yield + self.container.stop() + + def _make_config(self, enable_versioning=False): + from feast.infra.online_stores.redis import RedisOnlineStoreConfig + + return RepoConfig( + project="test_project", + provider="local", + online_store=RedisOnlineStoreConfig( + type="redis", + connection_string=f"localhost:{self.port}", + ), + registry=RegistryConfig( + path="/tmp/test_redis_registry.pb", + enable_online_feature_view_versioning=enable_versioning, + ), + entity_key_serialization_version=3, + ) + + def test_write_read_without_versioning(self): + config = self._make_config(enable_versioning=False) + store = RedisOnlineStore() + fv = _make_feature_view() + store.update(config, [], [fv], [], [], False) + + result = _write_and_read(store, config, fv) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 42 + + def test_write_read_with_versioning_v1(self): + config = self._make_config(enable_versioning=True) + store = RedisOnlineStore() + fv = _make_feature_view() + fv.current_version_number = 1 + store.update(config, [], [fv], [], [], False) + + result = _write_and_read(store, config, fv) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 42 + + def test_version_isolation(self): + """Data written to v1 is not visible from v2.""" + config = self._make_config(enable_versioning=True) + store = RedisOnlineStore() + + fv_v1 = _make_feature_view() + fv_v1.current_version_number = 1 + store.update(config, [], [fv_v1], [], [], False) + _write_and_read(store, config, fv_v1, driver_id=1001, trips=10) + + fv_v2 = _make_feature_view() + fv_v2.current_version_number = 2 + store.update(config, [], [fv_v2], [], [], False) + + entity_key = _make_entity_key(1001) + result = store.online_read(config, fv_v2, [entity_key], ["trips_today"]) + # In Redis, all versions share the same entity hash key. When v2 hash + # fields don't exist, hmget returns None values which become empty + # ValueProtos. The key assertion is that v1's actual data (10) does NOT + # leak through to v2. + ts_v2, feats_v2 = result[0] + assert feats_v2 is None or feats_v2["trips_today"].int64_val != 10 + + result = store.online_read(config, fv_v1, [entity_key], ["trips_today"]) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 10 + + def test_projection_version_tag_routes_to_correct_table(self): + """projection.version_tag routes reads to the correct versioned hash fields.""" + config = self._make_config(enable_versioning=True) + store = RedisOnlineStore() + + fv_v1 = _make_feature_view() + fv_v1.current_version_number = 1 + store.update(config, [], [fv_v1], [], [], False) + _write_and_read(store, config, fv_v1, driver_id=1001, trips=100) + + fv_v2 = _make_feature_view() + fv_v2.current_version_number = 2 + store.update(config, [], [fv_v2], [], [], False) + _write_and_read(store, config, fv_v2, driver_id=1001, trips=200) + + fv_read = _make_feature_view() + fv_read.projection.version_tag = 1 + entity_key = _make_entity_key(1001) + result = store.online_read(config, fv_read, [entity_key], ["trips_today"]) + assert result[0][1]["trips_today"].int64_val == 100 + + fv_read2 = _make_feature_view() + fv_read2.projection.version_tag = 2 + result = store.online_read(config, fv_read2, [entity_key], ["trips_today"]) + assert result[0][1]["trips_today"].int64_val == 200 + + def test_teardown_cleans_up(self): + """teardown() removes entity keys without error.""" + config = self._make_config(enable_versioning=True) + store = RedisOnlineStore() + + fv = _make_feature_view() + fv.current_version_number = 1 + store.update(config, [], [fv], [], [], False) + _write_and_read(store, config, fv) + + # Should not raise + store.teardown(config, [fv], []) + + def test_delete_table_versioned(self): + """delete_table() removes only the versioned hash fields.""" + config = self._make_config(enable_versioning=True) + store = RedisOnlineStore() + + fv_v1 = _make_feature_view() + fv_v1.current_version_number = 1 + store.update(config, [], [fv_v1], [], [], False) + _write_and_read(store, config, fv_v1, driver_id=1001, trips=10) + + fv_v2 = _make_feature_view() + fv_v2.current_version_number = 2 + store.update(config, [], [fv_v2], [], [], False) + _write_and_read(store, config, fv_v2, driver_id=1001, trips=20) + + # Delete v1 via update + store.update(config, [fv_v1], [fv_v2], [], [], False) + + entity_key = _make_entity_key(1001) + # v2 should still be readable + result = store.online_read(config, fv_v2, [entity_key], ["trips_today"]) + assert result[0][1] is not None + assert result[0][1]["trips_today"].int64_val == 20 diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_versioning.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_versioning.py new file mode 100644 index 00000000000..492d68494d8 --- /dev/null +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_versioning.py @@ -0,0 +1,128 @@ +"""Unit tests for DynamoDB online store feature view versioning.""" + +from datetime import timedelta +from unittest.mock import MagicMock + +from feast import Entity, FeatureView +from feast.field import Field +from feast.types import Float32 +from feast.value_type import ValueType + + +def _make_feature_view(name="driver_stats", version_number=None, version_tag=None): + entity = Entity( + name="driver_id", + join_keys=["driver_id"], + value_type=ValueType.INT64, + ) + fv = FeatureView( + name=name, + entities=[entity], + ttl=timedelta(days=1), + schema=[Field(name="trips_today", dtype=Float32)], + ) + if version_number is not None: + fv.current_version_number = version_number + if version_tag is not None: + fv.projection.version_tag = version_tag + return fv + + +def _make_config(project="test_project", versioning=False): + config = MagicMock() + config.project = project + config.entity_key_serialization_version = 2 + config.registry.enable_online_feature_view_versioning = versioning + return config + + +def _make_online_config(template="{project}.{table_name}"): + online_config = MagicMock() + online_config.table_name_template = template + return online_config + + +class TestGetTableName: + """Test _get_table_name with versioning enabled/disabled.""" + + def test_no_versioning(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view() + config = _make_config(versioning=False) + online_config = _make_online_config() + assert _get_table_name(online_config, config, fv) == "test_project.driver_stats" + + def test_versioning_enabled_with_version(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view(version_number=2) + config = _make_config(versioning=True) + online_config = _make_online_config() + assert ( + _get_table_name(online_config, config, fv) == "test_project.driver_stats_v2" + ) + + def test_projection_version_tag_takes_priority(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view(version_number=1, version_tag=3) + config = _make_config(versioning=True) + online_config = _make_online_config() + assert ( + _get_table_name(online_config, config, fv) == "test_project.driver_stats_v3" + ) + + def test_version_zero_no_suffix(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view(version_number=0) + config = _make_config(versioning=True) + online_config = _make_online_config() + assert _get_table_name(online_config, config, fv) == "test_project.driver_stats" + + def test_versioning_enabled_no_version_set(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view() + config = _make_config(versioning=True) + online_config = _make_online_config() + assert _get_table_name(online_config, config, fv) == "test_project.driver_stats" + + def test_custom_template_with_versioning(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view(version_number=2) + config = _make_config(project="prod", versioning=True) + online_config = _make_online_config(template="feast-{project}-{table_name}") + assert ( + _get_table_name(online_config, config, fv) == "feast-prod-driver_stats_v2" + ) + + def test_versioning_disabled_ignores_version(self): + from feast.infra.online_stores.dynamodb import _get_table_name + + fv = _make_feature_view(version_number=5) + config = _make_config(versioning=False) + online_config = _make_online_config() + assert _get_table_name(online_config, config, fv) == "test_project.driver_stats" + + +class TestDynamoDBVersionedReadSupport: + """Test that DynamoDBOnlineStore passes _check_versioned_read_support.""" + + def test_allowed_with_version_tag(self): + from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore + + store = DynamoDBOnlineStore() + fv = _make_feature_view() + fv.projection.version_tag = 2 + # Should not raise + store._check_versioned_read_support([(fv, ["trips_today"])]) + + def test_allowed_without_version_tag(self): + from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore + + store = DynamoDBOnlineStore() + fv = _make_feature_view() + store._check_versioned_read_support([(fv, ["trips_today"])]) diff --git a/sdk/python/tests/unit/infra/online_store/test_redis_versioning.py b/sdk/python/tests/unit/infra/online_store/test_redis_versioning.py new file mode 100644 index 00000000000..caf3216cb4d --- /dev/null +++ b/sdk/python/tests/unit/infra/online_store/test_redis_versioning.py @@ -0,0 +1,145 @@ +"""Unit tests for Redis online store feature view versioning.""" + +from datetime import timedelta +from unittest.mock import MagicMock + +from feast import Entity, FeatureView +from feast.field import Field +from feast.infra.online_stores.helpers import _mmh3 +from feast.types import Float32 +from feast.value_type import ValueType + + +def _make_feature_view(name="driver_stats", version_number=None, version_tag=None): + entity = Entity( + name="driver_id", + join_keys=["driver_id"], + value_type=ValueType.INT64, + ) + fv = FeatureView( + name=name, + entities=[entity], + ttl=timedelta(days=1), + schema=[Field(name="trips_today", dtype=Float32)], + ) + if version_number is not None: + fv.current_version_number = version_number + if version_tag is not None: + fv.projection.version_tag = version_tag + return fv + + +def _make_config(project="test_project", versioning=False): + config = MagicMock() + config.project = project + config.entity_key_serialization_version = 2 + config.registry.enable_online_feature_view_versioning = versioning + return config + + +class TestVersionedFvName: + """Test _versioned_fv_name produces correct versioned names.""" + + def test_no_versioning(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view() + config = _make_config(versioning=False) + assert _versioned_fv_name(fv, config) == "driver_stats" + + def test_versioning_disabled_ignores_version(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view(version_number=3) + config = _make_config(versioning=False) + assert _versioned_fv_name(fv, config) == "driver_stats" + + def test_versioning_enabled_no_version_set(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view() + config = _make_config(versioning=True) + assert _versioned_fv_name(fv, config) == "driver_stats" + + def test_versioning_enabled_with_current_version_number(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view(version_number=2) + config = _make_config(versioning=True) + assert _versioned_fv_name(fv, config) == "driver_stats_v2" + + def test_version_zero_no_suffix(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view(version_number=0) + config = _make_config(versioning=True) + assert _versioned_fv_name(fv, config) == "driver_stats" + + def test_projection_version_tag_takes_priority(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view(version_number=1, version_tag=3) + config = _make_config(versioning=True) + assert _versioned_fv_name(fv, config) == "driver_stats_v3" + + def test_projection_version_tag_zero_no_suffix(self): + from feast.infra.online_stores.redis import _versioned_fv_name + + fv = _make_feature_view(version_tag=0, version_number=3) + config = _make_config(versioning=True) + assert _versioned_fv_name(fv, config) == "driver_stats" + + +class TestHsetKeysVersioning: + """Test that _generate_hset_keys_for_features produces different keys for different versions.""" + + def test_different_versions_produce_different_hset_keys(self): + from feast.infra.online_stores.redis import RedisOnlineStore + + store = RedisOnlineStore() + fv = _make_feature_view() + + _, hset_keys_v1 = store._generate_hset_keys_for_features( + fv, ["trips_today"], fv_name_override="driver_stats_v1" + ) + _, hset_keys_v2 = store._generate_hset_keys_for_features( + fv, ["trips_today"], fv_name_override="driver_stats_v2" + ) + + # The mmh3 hash keys should differ + assert hset_keys_v1[0] != hset_keys_v2[0] + # The timestamp keys should also differ + assert hset_keys_v1[1] != hset_keys_v2[1] + + def test_no_override_uses_fv_name(self): + from feast.infra.online_stores.redis import RedisOnlineStore + + store = RedisOnlineStore() + fv = _make_feature_view() + + _, hset_keys = store._generate_hset_keys_for_features(fv, ["trips_today"]) + + expected_feature_key = _mmh3("driver_stats:trips_today") + expected_ts_key = "_ts:driver_stats" + assert hset_keys[0] == expected_feature_key + assert hset_keys[1] == expected_ts_key + + +class TestRedisVersionedReadSupport: + """Test that RedisOnlineStore passes _check_versioned_read_support.""" + + def test_allowed_with_version_tag(self): + from feast.infra.online_stores.redis import RedisOnlineStore + + store = RedisOnlineStore() + fv = _make_feature_view() + fv.projection.version_tag = 2 + # Should not raise + store._check_versioned_read_support([(fv, ["trips_today"])]) + + def test_allowed_without_version_tag(self): + from feast.infra.online_stores.redis import RedisOnlineStore + + store = RedisOnlineStore() + fv = _make_feature_view() + store._check_versioned_read_support([(fv, ["trips_today"])])