Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError):
def __init__(self, store_name: str, version: int):
super().__init__(
f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. "
f"Currently only SQLite, PostgreSQL, and MySQL support version-qualified feature references. "
f"Currently only SQLite, PostgreSQL, MySQL, Redis, and DynamoDB support version-qualified feature references. "
)


Expand Down
9 changes: 8 additions & 1 deletion sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,8 +1154,15 @@ def _initialize_dynamodb_resource(
def _get_table_name(
online_config: DynamoDBOnlineStoreConfig, config: RepoConfig, table: FeatureView
) -> str:
table_name = table.name
if config.registry.enable_online_feature_view_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
table_name = f"{table.name}_v{version}"
return online_config.table_name_template.format(
project=config.project, table_name=table.name
project=config.project, table_name=table_name
)


Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,18 @@ def _check_versioned_read_support(self, grouped_refs):
supported_types.append(PostgreSQLOnlineStore)
except ImportError:
pass
try:
from feast.infra.online_stores.redis import RedisOnlineStore

supported_types.append(RedisOnlineStore)
except ImportError:
pass
try:
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore

supported_types.append(DynamoDBOnlineStore)
except ImportError:
pass

if isinstance(self, tuple(supported_types)):
return
Expand Down
38 changes: 27 additions & 11 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@
logger = logging.getLogger(__name__)


def _versioned_fv_name(table: FeatureView, config: RepoConfig) -> str:
"""Return the feature view name with version suffix when versioning is enabled."""
if config.registry.enable_online_feature_view_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
return f"{table.name}_v{version}"
return table.name


class RedisType(str, Enum):
redis = "redis"
redis_cluster = "redis_cluster"
Expand Down Expand Up @@ -123,8 +134,9 @@ def delete_table(self, config: RepoConfig, table: FeatureView):
deleted_count = 0
prefix = _redis_key_prefix(table.join_keys)

redis_hash_keys = [_mmh3(f"{table.name}:{f.name}") for f in table.features]
redis_hash_keys.append(bytes(f"_ts:{table.name}", "utf8"))
fv_name = _versioned_fv_name(table, config)
redis_hash_keys = [_mmh3(f"{fv_name}:{f.name}") for f in table.features]
redis_hash_keys.append(bytes(f"_ts:{fv_name}", "utf8"))

with client.pipeline(transaction=False) as pipe:
for _k in client.scan_iter(
Expand All @@ -133,7 +145,7 @@ def delete_table(self, config: RepoConfig, table: FeatureView):
_tables = {
_hk[4:] for _hk in client.hgetall(_k) if _hk.startswith(b"_ts:")
}
if bytes(table.name, "utf8") not in _tables:
if bytes(fv_name, "utf8") not in _tables:
continue
if len(_tables) == 1:
pipe.delete(_k)
Expand All @@ -142,7 +154,7 @@ def delete_table(self, config: RepoConfig, table: FeatureView):
deleted_count += 1
pipe.execute()

logger.debug(f"Deleted {deleted_count} rows for feature view {table.name}")
logger.debug(f"Deleted {deleted_count} rows for feature view {fv_name}")

def update(
self,
Expand Down Expand Up @@ -281,7 +293,7 @@ def online_write_batch(
client = self._get_client(online_store_config)
project = config.project

feature_view = table.name
feature_view = _versioned_fv_name(table, config)
ts_key = f"_ts:{feature_view}"
keys = []
# redis pipelining optimization: send multiple commands to redis server without waiting for every reply
Expand Down Expand Up @@ -355,13 +367,15 @@ def _generate_hset_keys_for_features(
self,
feature_view: FeatureView,
requested_features: Optional[List[str]] = None,
fv_name_override: Optional[str] = None,
) -> Tuple[List[str], List[str]]:
if not requested_features:
requested_features = [f.name for f in feature_view.features]

hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features]
fv_name = fv_name_override or feature_view.name
hset_keys = [_mmh3(f"{fv_name}:{k}") for k in requested_features]

ts_key = f"_ts:{feature_view.name}"
ts_key = f"_ts:{fv_name}"
hset_keys.append(ts_key)
requested_features.append(ts_key)

Expand Down Expand Up @@ -390,9 +404,10 @@ def online_read(

client = self._get_client(online_store_config)
feature_view = table
fv_name = _versioned_fv_name(table, config)

requested_features, hset_keys = self._generate_hset_keys_for_features(
feature_view, requested_features
feature_view, requested_features, fv_name_override=fv_name
)
keys = self._generate_redis_keys_for_entities(config, entity_keys)

Expand All @@ -403,7 +418,7 @@ def online_read(
redis_values = pipe.execute()

return self._convert_redis_values_to_protobuf(
redis_values, feature_view.name, requested_features
redis_values, fv_name, requested_features
)

async def online_read_async(
Expand All @@ -418,9 +433,10 @@ async def online_read_async(

client = await self._get_client_async(online_store_config)
feature_view = table
fv_name = _versioned_fv_name(table, config)

requested_features, hset_keys = self._generate_hset_keys_for_features(
feature_view, requested_features
feature_view, requested_features, fv_name_override=fv_name
)
keys = self._generate_redis_keys_for_entities(config, entity_keys)

Expand All @@ -430,7 +446,7 @@ async def online_read_async(
redis_values = await pipe.execute()

return self._convert_redis_values_to_protobuf(
redis_values, feature_view.name, requested_features
redis_values, fv_name, requested_features
)

def _get_features_for_entity(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Integration tests for DynamoDB online store feature view versioning.

Run with: pytest --integration sdk/python/tests/integration/online_store/test_dynamodb_versioning.py

Uses moto to mock the DynamoDB service (no Docker required).
"""

import os
from datetime import datetime, timedelta, timezone

import pytest

from feast import Entity, FeatureView
from feast.field import Field
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RegistryConfig, RepoConfig
from feast.types import Float32, Int64
from feast.value_type import ValueType


def _make_feature_view(name="driver_stats", version="latest"):
entity = Entity(
name="driver_id",
join_keys=["driver_id"],
value_type=ValueType.INT64,
)
return FeatureView(
name=name,
entities=[entity],
ttl=timedelta(days=1),
schema=[
Field(name="driver_id", dtype=Int64),
Field(name="trips_today", dtype=Int64),
Field(name="avg_rating", dtype=Float32),
],
version=version,
)


def _make_entity_key(driver_id: int) -> EntityKeyProto:
entity_key = EntityKeyProto()
entity_key.join_keys.append("driver_id")
val = ValueProto()
val.int64_val = driver_id
entity_key.entity_values.append(val)
return entity_key


def _write_and_read(store, config, fv, driver_id=1001, trips=42):
entity_key = _make_entity_key(driver_id)
val = ValueProto()
val.int64_val = trips
now = datetime.now(tz=timezone.utc)
store.online_write_batch(
config, fv, [(entity_key, {"trips_today": val}, now, now)], None
)
return store.online_read(config, fv, [entity_key], ["trips_today"])


def _make_config(enable_versioning=False):
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig

return RepoConfig(
project="test_project",
provider="local",
online_store=DynamoDBOnlineStoreConfig(
type="dynamodb",
region="us-east-1",
),
registry=RegistryConfig(
path="/tmp/test_dynamodb_registry.pb",
enable_online_feature_view_versioning=enable_versioning,
),
entity_key_serialization_version=3,
)


@pytest.mark.integration
class TestDynamoDBVersioningIntegration:
"""Integration tests for DynamoDB versioning using moto mock."""

@pytest.fixture(autouse=True)
def setup_dynamodb(self):
try:
from moto import mock_dynamodb
except ImportError:
pytest.skip("moto not installed")

# Set dummy AWS credentials for moto
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" # noqa: S105 # pragma: allowlist secret
os.environ["AWS_SECURITY_TOKEN"] = "testing" # noqa: S105 # pragma: allowlist secret
os.environ["AWS_SESSION_TOKEN"] = "testing" # noqa: S105 # pragma: allowlist secret
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

with mock_dynamodb():
yield

def test_write_read_without_versioning(self):
config = _make_config(enable_versioning=False)
store = DynamoDBOnlineStore()
fv = _make_feature_view()
store.update(config, [], [fv], [], [], False)

result = _write_and_read(store, config, fv)
assert result[0][1] is not None
assert result[0][1]["trips_today"].int64_val == 42

def test_write_read_with_versioning_v1(self):
config = _make_config(enable_versioning=True)
store = DynamoDBOnlineStore()
fv = _make_feature_view()
fv.current_version_number = 1
store.update(config, [], [fv], [], [], False)

result = _write_and_read(store, config, fv)
assert result[0][1] is not None
assert result[0][1]["trips_today"].int64_val == 42

def test_version_isolation(self):
"""Data written to v1 is not visible from v2."""
config = _make_config(enable_versioning=True)
store = DynamoDBOnlineStore()

fv_v1 = _make_feature_view()
fv_v1.current_version_number = 1
store.update(config, [], [fv_v1], [], [], False)
_write_and_read(store, config, fv_v1, driver_id=1001, trips=10)

fv_v2 = _make_feature_view()
fv_v2.current_version_number = 2
store.update(config, [], [fv_v2], [], [], False)

entity_key = _make_entity_key(1001)
result = store.online_read(config, fv_v2, [entity_key], ["trips_today"])
assert result[0] == (None, None)

result = store.online_read(config, fv_v1, [entity_key], ["trips_today"])
assert result[0][1] is not None
assert result[0][1]["trips_today"].int64_val == 10

def test_projection_version_tag_routes_to_correct_table(self):
"""projection.version_tag routes reads to the correct versioned DynamoDB table."""
config = _make_config(enable_versioning=True)
store = DynamoDBOnlineStore()

fv_v1 = _make_feature_view()
fv_v1.current_version_number = 1
store.update(config, [], [fv_v1], [], [], False)
_write_and_read(store, config, fv_v1, driver_id=1001, trips=100)

fv_v2 = _make_feature_view()
fv_v2.current_version_number = 2
store.update(config, [], [fv_v2], [], [], False)
_write_and_read(store, config, fv_v2, driver_id=1001, trips=200)

fv_read = _make_feature_view()
fv_read.projection.version_tag = 1
entity_key = _make_entity_key(1001)
result = store.online_read(config, fv_read, [entity_key], ["trips_today"])
assert result[0][1]["trips_today"].int64_val == 100

fv_read2 = _make_feature_view()
fv_read2.projection.version_tag = 2
result = store.online_read(config, fv_read2, [entity_key], ["trips_today"])
assert result[0][1]["trips_today"].int64_val == 200

def test_teardown_versioned_table(self):
"""teardown() drops the versioned DynamoDB table without error."""
config = _make_config(enable_versioning=True)
store = DynamoDBOnlineStore()

fv = _make_feature_view()
fv.current_version_number = 1
store.update(config, [], [fv], [], [], False)
_write_and_read(store, config, fv)

# Should not raise
store.teardown(config, [fv], [])

def test_update_deletes_versioned_table(self):
"""update() with tables_to_delete correctly drops versioned DynamoDB tables."""
config = _make_config(enable_versioning=True)
store = DynamoDBOnlineStore()

fv = _make_feature_view()
fv.current_version_number = 1
store.update(config, [], [fv], [], [], False)
_write_and_read(store, config, fv, driver_id=1001, trips=50)

# Delete the versioned table
store.update(config, [fv], [], [], [], False)
Loading
Loading