From 40cb583b2bdbedd81e5f7143782daf5f3c5eb3c5 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 23 Apr 2025 10:39:53 -0400 Subject: [PATCH 1/6] add feature view tags to dynamo tags Signed-off-by: Rob Howley --- .../feast/infra/online_stores/dynamodb.py | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index edf4141b5be..453e0bea03f 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -167,19 +167,27 @@ def update( online_config.endpoint_url, online_config.session_based_auth, ) - # Add Tags attribute to creation request only if configured to prevent - # TagResource permission issues, even with an empty Tags array. - kwargs = ( - { - "Tags": [ + + online_tags = online_config.tags or {} + common_tags = [ + {"Key": key, "Value": value} + for key, value in online_tags.items() + ] + + for table_instance in tables_to_keep: + table_tags = common_tags + ( + [ {"Key": key, "Value": value} - for key, value in online_config.tags.items() + for key, value in table_instance.tags.items() + if key not in online_tags ] - } - if online_config.tags - else {} - ) - for table_instance in tables_to_keep: + if table_instance.tags + else [] + ) + + # Add Tags attribute to creation request only if configured to prevent + # TagResource permission issues, even with an empty Tags array. + kwargs = {"Tags": table_tags} if table_tags else {} try: dynamodb_resource.create_table( TableName=_get_table_name(online_config, config, table_instance), From 009f82936d95a3bde5fda24bc37d54fe41c0bbb1 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 23 Apr 2025 13:56:38 -0400 Subject: [PATCH 2/6] add tag update helper method Signed-off-by: Rob Howley --- .../feast/infra/online_stores/dynamodb.py | 64 ++++++++++++------ .../test_dynamodb_online_store.py | 67 ++++++++++++++++++- 2 files changed, 111 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 453e0bea03f..893f030a3c1 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -138,6 +138,43 @@ async def close(self): def async_supported(self) -> SupportedAsyncMethods: return SupportedAsyncMethods(read=True, write=True) + @staticmethod + def _table_tags(online_config, table_instance) -> list[dict[str, str]]: + online_tags = online_config.tags or {} + common_tags = [ + {"Key": key, "Value": value} for key, value in online_tags.items() + ] + return common_tags + ( + [ + {"Key": key, "Value": value} + for key, value in table_instance.tags.items() + if key not in online_tags + ] + if table_instance.tags + else [] + ) + + @staticmethod + def _update_tags(dynamodb_client, table_name, new_tags): + table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][ + "TableArn" + ] + current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn).get( + "Tags" + ) + if current_tags: + remove_keys = [ + tag["Key"] + for tag in current_tags + if tag["Key"] not in new_tags or tag["Value"] != new_tags[tag["Key"]] + ] + if remove_keys: + dynamodb_client.untag_resource( + ResourceArn=table_arn, TagKeys=remove_keys + ) + if new_tags: + dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags) + def update( self, config: RepoConfig, @@ -168,26 +205,12 @@ def update( online_config.session_based_auth, ) - online_tags = online_config.tags or {} - common_tags = [ - {"Key": key, "Value": value} - for key, value in online_tags.items() - ] - for table_instance in tables_to_keep: - table_tags = common_tags + ( - [ - {"Key": key, "Value": value} - for key, value in table_instance.tags.items() - if key not in online_tags - ] - if table_instance.tags - else [] - ) - # Add Tags attribute to creation request only if configured to prevent # TagResource permission issues, even with an empty Tags array. + table_tags = self._table_tags(online_config, table_instance) kwargs = {"Tags": table_tags} if table_tags else {} + try: dynamodb_resource.create_table( TableName=_get_table_name(online_config, config, table_instance), @@ -206,9 +229,12 @@ def update( raise for table_instance in tables_to_keep: - dynamodb_client.get_waiter("table_exists").wait( - TableName=_get_table_name(online_config, config, table_instance) - ) + table_name = _get_table_name(online_config, config, table_instance) + dynamodb_client.get_waiter("table_exists").wait(TableName=table_name) + # once table is confirmed to exist, update the tags. + # tags won't be updated in the create_table call if the table already exists + tags = self._table_tags(online_config, table_instance) + self._update_tags(dynamodb_client, table_name, tags) for table_to_delete in tables_to_delete: _delete_table_idempotent( diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index cb1c15ee6e4..2f636fc93db 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -1,6 +1,7 @@ from copy import deepcopy from dataclasses import dataclass from datetime import datetime +from typing import Optional import boto3 import pytest @@ -32,6 +33,7 @@ @dataclass class MockFeatureView: name: str + tags: Optional[dict[str, str]] = None @pytest.fixture @@ -209,6 +211,13 @@ def test_dynamodb_online_store_online_write_batch( assert [item[1] for item in stored_items] == list(features) +def _get_tags(dynamodb_client, table_name): + table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][ + "TableArn" + ] + return dynamodb_client.list_tags_of_resource(ResourceArn=table_arn).get("Tags") + + @mock_dynamodb def test_dynamodb_online_store_update(repo_config, dynamodb_online_store): """Test DynamoDBOnlineStore update method.""" @@ -222,7 +231,7 @@ def test_dynamodb_online_store_update(repo_config, dynamodb_online_store): dynamodb_online_store.update( config=repo_config, tables_to_delete=[MockFeatureView(name=db_table_delete_name)], - tables_to_keep=[MockFeatureView(name=db_table_keep_name)], + tables_to_keep=[MockFeatureView(name=db_table_keep_name, tags={"some": "tag"})], entities_to_delete=None, entities_to_keep=None, partial=None, @@ -237,6 +246,62 @@ def test_dynamodb_online_store_update(repo_config, dynamodb_online_store): assert len(existing_tables) == 1 assert existing_tables[0] == f"test_aws.{db_table_keep_name}" + assert _get_tags(dynamodb_client, existing_tables[0]) == [ + {"Key": "some", "Value": "tag"} + ] + + +@mock_dynamodb +def test_dynamodb_online_store_update_tags(repo_config, dynamodb_online_store): + """Test DynamoDBOnlineStore update method.""" + # create dummy table to update with new tags and tag values + table_name = f"{TABLE_NAME}_keep_update_tags" + create_test_table(PROJECT, table_name, REGION) + + # add tags on update + dynamodb_online_store.update( + config=repo_config, + tables_to_delete=[], + tables_to_keep=[MockFeatureView(name=table_name, tags={"some": "tag"})], + entities_to_delete=[], + entities_to_keep=[], + partial=None, + ) + + # update tags + dynamodb_online_store.update( + config=repo_config, + tables_to_delete=[], + tables_to_keep=[ + MockFeatureView(name=table_name, tags={"some": "new-tag", "another": "tag"}) + ], + entities_to_delete=[], + entities_to_keep=[], + partial=None, + ) + + # check only db_table_keep_name exists + dynamodb_client = dynamodb_online_store._get_dynamodb_client(REGION) + existing_tables = dynamodb_client.list_tables().get("TableNames", None) + + expected_tags = [ + {"Key": "some", "Value": "new-tag"}, + {"Key": "another", "Value": "tag"}, + ] + assert _get_tags(dynamodb_client, existing_tables[0]) == expected_tags + + # and then remove all tags + dynamodb_online_store.update( + config=repo_config, + tables_to_delete=[], + tables_to_keep=[MockFeatureView(name=table_name, tags=None)], + entities_to_delete=[], + entities_to_keep=[], + partial=None, + ) + + assert _get_tags(dynamodb_client, existing_tables[0]) == [] + @mock_dynamodb def test_dynamodb_online_store_teardown(repo_config, dynamodb_online_store): From 61a21aa94470b41fd468228c61707c1839a73c9e Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 23 Apr 2025 14:01:52 -0400 Subject: [PATCH 3/6] fix tag change checks Signed-off-by: Rob Howley --- sdk/python/feast/infra/online_stores/dynamodb.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 893f030a3c1..32f965fb932 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -155,7 +155,7 @@ def _table_tags(online_config, table_instance) -> list[dict[str, str]]: ) @staticmethod - def _update_tags(dynamodb_client, table_name, new_tags): + def _update_tags(dynamodb_client, table_name: str, new_tags: list[dict[str, str]]): table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][ "TableArn" ] @@ -163,10 +163,12 @@ def _update_tags(dynamodb_client, table_name, new_tags): "Tags" ) if current_tags: + new_tags_dict = {nt["Key"]: nt["Value"] for nt in new_tags} remove_keys = [ tag["Key"] for tag in current_tags - if tag["Key"] not in new_tags or tag["Value"] != new_tags[tag["Key"]] + if tag["Key"] not in new_tags_dict + or tag["Value"] != new_tags_dict[tag["Key"]] ] if remove_keys: dynamodb_client.untag_resource( From ea25f259776251891798ffd7b30809c0aa161b1d Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 23 Apr 2025 14:34:21 -0400 Subject: [PATCH 4/6] simplify and just untag all to re add Signed-off-by: Rob Howley --- .../feast/infra/online_stores/dynamodb.py | 18 +++++------------- .../online_store/test_dynamodb_online_store.py | 16 ++++++++++++---- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 32f965fb932..4f1c9fc4ad7 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -159,21 +159,13 @@ def _update_tags(dynamodb_client, table_name: str, new_tags: list[dict[str, str] table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][ "TableArn" ] - current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn).get( + current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn)[ "Tags" - ) + ] if current_tags: - new_tags_dict = {nt["Key"]: nt["Value"] for nt in new_tags} - remove_keys = [ - tag["Key"] - for tag in current_tags - if tag["Key"] not in new_tags_dict - or tag["Value"] != new_tags_dict[tag["Key"]] - ] - if remove_keys: - dynamodb_client.untag_resource( - ResourceArn=table_arn, TagKeys=remove_keys - ) + remove_keys = [tag["Key"] for tag in current_tags] + dynamodb_client.untag_resource(ResourceArn=table_arn, TagKeys=remove_keys) + if new_tags: dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags) diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 2f636fc93db..772a9455289 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -262,7 +262,11 @@ def test_dynamodb_online_store_update_tags(repo_config, dynamodb_online_store): dynamodb_online_store.update( config=repo_config, tables_to_delete=[], - tables_to_keep=[MockFeatureView(name=table_name, tags={"some": "tag"})], + tables_to_keep=[ + MockFeatureView( + name=table_name, tags={"key1": "val1", "key2": "val2", "key3": "val3"} + ) + ], entities_to_delete=[], entities_to_keep=[], partial=None, @@ -273,7 +277,10 @@ def test_dynamodb_online_store_update_tags(repo_config, dynamodb_online_store): config=repo_config, tables_to_delete=[], tables_to_keep=[ - MockFeatureView(name=table_name, tags={"some": "new-tag", "another": "tag"}) + MockFeatureView( + name=table_name, + tags={"key1": "new-val1", "key2": "val2", "key4": "val4"}, + ) ], entities_to_delete=[], entities_to_keep=[], @@ -285,8 +292,9 @@ def test_dynamodb_online_store_update_tags(repo_config, dynamodb_online_store): existing_tables = dynamodb_client.list_tables().get("TableNames", None) expected_tags = [ - {"Key": "some", "Value": "new-tag"}, - {"Key": "another", "Value": "tag"}, + {"Key": "key1", "Value": "new-val1"}, + {"Key": "key2", "Value": "val2"}, + {"Key": "key4", "Value": "val4"}, ] assert _get_tags(dynamodb_client, existing_tables[0]) == expected_tags From c7548e1569fa6ca48395f73bec621363da2ba4ee Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 23 Apr 2025 14:48:50 -0400 Subject: [PATCH 5/6] dont do tag updates for new tables Signed-off-by: Rob Howley --- sdk/python/feast/infra/online_stores/dynamodb.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 4f1c9fc4ad7..a5889999082 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -15,7 +15,7 @@ import contextlib import itertools import logging -from collections import OrderedDict +from collections import OrderedDict, defaultdict from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union @@ -199,15 +199,17 @@ def update( online_config.session_based_auth, ) + do_tag_updates = defaultdict(bool) for table_instance in tables_to_keep: # Add Tags attribute to creation request only if configured to prevent # TagResource permission issues, even with an empty Tags array. table_tags = self._table_tags(online_config, table_instance) kwargs = {"Tags": table_tags} if table_tags else {} + table_name = _get_table_name(online_config, config, table_instance) try: dynamodb_resource.create_table( - TableName=_get_table_name(online_config, config, table_instance), + TableName=table_name, KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], AttributeDefinitions=[ {"AttributeName": "entity_id", "AttributeType": "S"} @@ -215,7 +217,10 @@ def update( BillingMode="PAY_PER_REQUEST", **kwargs, ) + except ClientError as ce: + do_tag_updates[table_name] = True + # If the table creation fails with ResourceInUseException, # it means the table already exists or is being created. # Otherwise, re-raise the exception @@ -227,8 +232,9 @@ def update( dynamodb_client.get_waiter("table_exists").wait(TableName=table_name) # once table is confirmed to exist, update the tags. # tags won't be updated in the create_table call if the table already exists - tags = self._table_tags(online_config, table_instance) - self._update_tags(dynamodb_client, table_name, tags) + if do_tag_updates[table_name]: + tags = self._table_tags(online_config, table_instance) + self._update_tags(dynamodb_client, table_name, tags) for table_to_delete in tables_to_delete: _delete_table_idempotent( From 603d302b8d71adc16b188168f80c5caca7f8ea6c Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 23 Apr 2025 15:08:18 -0400 Subject: [PATCH 6/6] more granular tags take priority Signed-off-by: Rob Howley --- .../feast/infra/online_stores/dynamodb.py | 21 ++++++------ .../test_dynamodb_online_store.py | 33 +++++++++++++++++++ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index a5889999082..13dd949059c 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -140,19 +140,20 @@ def async_supported(self) -> SupportedAsyncMethods: @staticmethod def _table_tags(online_config, table_instance) -> list[dict[str, str]]: + table_instance_tags = table_instance.tags or {} online_tags = online_config.tags or {} + common_tags = [ - {"Key": key, "Value": value} for key, value in online_tags.items() + {"Key": key, "Value": table_instance_tags.get(key) or value} + for key, value in online_tags.items() ] - return common_tags + ( - [ - {"Key": key, "Value": value} - for key, value in table_instance.tags.items() - if key not in online_tags - ] - if table_instance.tags - else [] - ) + table_tags = [ + {"Key": key, "Value": value} + for key, value in table_instance_tags.items() + if key not in online_tags + ] + + return common_tags + table_tags @staticmethod def _update_tags(dynamodb_client, table_name: str, new_tags: list[dict[str, str]]): diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 772a9455289..91f0474ab93 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -36,6 +36,11 @@ class MockFeatureView: tags: Optional[dict[str, str]] = None +@dataclass +class MockOnlineConfig: + tags: Optional[dict[str, str]] = None + + @pytest.fixture def repo_config(): return RepoConfig( @@ -311,6 +316,34 @@ def test_dynamodb_online_store_update_tags(repo_config, dynamodb_online_store): assert _get_tags(dynamodb_client, existing_tables[0]) == [] +@mock_dynamodb +@pytest.mark.parametrize( + "global_tags, table_tags, expected", + [ + (None, {"key": "val"}, [{"Key": "key", "Value": "val"}]), + ({"key": "val"}, None, [{"Key": "key", "Value": "val"}]), + ( + {"key1": "val1"}, + {"key2": "val2"}, + [{"Key": "key1", "Value": "val1"}, {"Key": "key2", "Value": "val2"}], + ), + ( + {"key": "val", "key2": "val2"}, + {"key": "new-val"}, + [{"Key": "key", "Value": "new-val"}, {"Key": "key2", "Value": "val2"}], + ), + ], +) +def test_dynamodb_online_store_tag_priority( + global_tags, table_tags, expected, dynamodb_online_store +): + actual = dynamodb_online_store._table_tags( + MockOnlineConfig(tags=global_tags), + MockFeatureView(name="table", tags=table_tags), + ) + assert actual == expected + + @mock_dynamodb def test_dynamodb_online_store_teardown(repo_config, dynamodb_online_store): """Test DynamoDBOnlineStore teardown method."""