From 61bea086fbbafe189c37929cef43818af76dec6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Fri, 21 Mar 2025 16:10:22 +0100 Subject: [PATCH 1/7] Add async DynamoDB timeout and retry configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- .../feast/infra/online_stores/dynamodb.py | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 2122eef2213..8c4c05296a0 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -52,6 +52,23 @@ logger = logging.getLogger(__name__) +class DynamoDBOnlineStoreRetryConfig(FeastConfigBaseModel): + """Async online store retry configuration for DynamoDB store. + + Cf. https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html + for details. + """ + + total_max_attempts: int | None = None + """Maximum number of total attempts that will be made on a single request.""" + + max_attempts: int | None = None + """Maximum number of retry attempts that will be made on a single request.""" + + mode: Literal["legacy", "standard", "adaptive"] | None = None + """The type of retry mode (aio)botocore should use.""" + + class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): """Online store config for DynamoDB store""" @@ -85,6 +102,17 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): keepalive_timeout: float = 12.0 """Keep-alive timeout in seconds for async Dynamodb connections.""" + connect_timeout: int | float = 60 + """The time in seconds until a timeout exception is thrown when attempting to make + an async connection.""" + + read_timeout: int | float = 60 + """The time in seconds until a timeout exception is thrown when attempting to read + from an async connection.""" + + retries: DynamoDBOnlineStoreRetryConfig = DynamoDBOnlineStoreRetryConfig() + """Configuration for retry behavior of async connections.""" + class DynamoDBOnlineStore(OnlineStore): """ @@ -99,10 +127,15 @@ class DynamoDBOnlineStore(OnlineStore): _dynamodb_resource = None async def initialize(self, config: RepoConfig): + online_config = config.online_store + await _get_aiodynamodb_client( - config.online_store.region, - config.online_store.max_pool_connections, - config.online_store.keepalive_timeout, + online_config.region, + online_config.max_pool_connections, + online_config.keepalive_timeout, + online_config.connect_timeout, + online_config.read_timeout, + online_config.retries, ) async def close(self): @@ -280,6 +313,9 @@ async def online_write_batch_async( online_config.region, online_config.max_pool_connections, online_config.keepalive_timeout, + online_config.connect_timeout, + online_config.read_timeout, + online_config.retries, ) await dynamo_write_items_async(client, table_name, items) @@ -387,6 +423,9 @@ def to_tbl_resp(raw_client_response): online_config.region, online_config.max_pool_connections, online_config.keepalive_timeout, + online_config.connect_timeout, + online_config.read_timeout, + online_config.retries, ) response_batches = await asyncio.gather( *[ @@ -546,7 +585,12 @@ def _get_aioboto_session(): async def _get_aiodynamodb_client( - region: str, max_pool_connections: int, keepalive_timeout: float + region: str, + max_pool_connections: int, + keepalive_timeout: float, + connect_timeout: int | float, + read_timeout: int | float, + retries: DynamoDBOnlineStoreRetryConfig, ): global _aioboto_client if _aioboto_client is None: @@ -556,6 +600,9 @@ async def _get_aiodynamodb_client( region_name=region, config=AioConfig( max_pool_connections=max_pool_connections, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + retries=retries.model_dump(exclude_none=True), connector_args={"keepalive_timeout": keepalive_timeout}, ), ) From fc42cc785b1a1dc241523494cf20d09de43e883e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Fri, 21 Mar 2025 16:59:51 +0100 Subject: [PATCH 2/7] Use union sytnax MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- sdk/python/feast/infra/online_stores/dynamodb.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 8c4c05296a0..a8c134d58f8 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -59,13 +59,13 @@ class DynamoDBOnlineStoreRetryConfig(FeastConfigBaseModel): for details. """ - total_max_attempts: int | None = None + total_max_attempts: Union[int, None] = None """Maximum number of total attempts that will be made on a single request.""" - max_attempts: int | None = None + max_attempts: Union[int, None] = None """Maximum number of retry attempts that will be made on a single request.""" - mode: Literal["legacy", "standard", "adaptive"] | None = None + mode: Union[Literal["legacy", "standard", "adaptive"], None] = None """The type of retry mode (aio)botocore should use.""" @@ -102,11 +102,11 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): keepalive_timeout: float = 12.0 """Keep-alive timeout in seconds for async Dynamodb connections.""" - connect_timeout: int | float = 60 + connect_timeout: Union[int, float] = 60 """The time in seconds until a timeout exception is thrown when attempting to make an async connection.""" - read_timeout: int | float = 60 + read_timeout: Union[int, float] = 60 """The time in seconds until a timeout exception is thrown when attempting to read from an async connection.""" @@ -588,8 +588,8 @@ async def _get_aiodynamodb_client( region: str, max_pool_connections: int, keepalive_timeout: float, - connect_timeout: int | float, - read_timeout: int | float, + connect_timeout: Union[int, float], + read_timeout: Union[int, float], retries: DynamoDBOnlineStoreRetryConfig, ): global _aioboto_client From 86be5fb1966fa7ac879f420a9ee59b750f4912e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Fri, 21 Mar 2025 22:54:21 +0100 Subject: [PATCH 3/7] Move retry configutation into top level, remove max_attempts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- .../feast/infra/online_stores/dynamodb.py | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index a8c134d58f8..a96006ebc0b 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -52,23 +52,6 @@ logger = logging.getLogger(__name__) -class DynamoDBOnlineStoreRetryConfig(FeastConfigBaseModel): - """Async online store retry configuration for DynamoDB store. - - Cf. https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html - for details. - """ - - total_max_attempts: Union[int, None] = None - """Maximum number of total attempts that will be made on a single request.""" - - max_attempts: Union[int, None] = None - """Maximum number of retry attempts that will be made on a single request.""" - - mode: Union[Literal["legacy", "standard", "adaptive"], None] = None - """The type of retry mode (aio)botocore should use.""" - - class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): """Online store config for DynamoDB store""" @@ -110,8 +93,16 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): """The time in seconds until a timeout exception is thrown when attempting to read from an async connection.""" - retries: DynamoDBOnlineStoreRetryConfig = DynamoDBOnlineStoreRetryConfig() - """Configuration for retry behavior of async connections.""" + total_max_retry_attempts: Union[int, None] = None + """Maximum number of total attempts that will be made on a single request. + + Cf. + https://github.com/boto/botocore/blob/dd8406d5fa1df18037d1dd2977aec47334f7e3ce/botocore/args.py#L558 + as for why `max_attempts` is not exposed here. + """ + + retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None + """The type of retry mode (aio)botocore should use.""" class DynamoDBOnlineStore(OnlineStore): @@ -135,7 +126,8 @@ async def initialize(self, config: RepoConfig): online_config.keepalive_timeout, online_config.connect_timeout, online_config.read_timeout, - online_config.retries, + online_config.total_max_retry_attempts, + online_config.retry_mode, ) async def close(self): @@ -315,7 +307,8 @@ async def online_write_batch_async( online_config.keepalive_timeout, online_config.connect_timeout, online_config.read_timeout, - online_config.retries, + online_config.total_max_retry_attempts, + online_config.retry_mode, ) await dynamo_write_items_async(client, table_name, items) @@ -425,7 +418,8 @@ def to_tbl_resp(raw_client_response): online_config.keepalive_timeout, online_config.connect_timeout, online_config.read_timeout, - online_config.retries, + online_config.total_max_retry_attempts, + online_config.retry_mode, ) response_batches = await asyncio.gather( *[ @@ -590,11 +584,19 @@ async def _get_aiodynamodb_client( keepalive_timeout: float, connect_timeout: Union[int, float], read_timeout: Union[int, float], - retries: DynamoDBOnlineStoreRetryConfig, + total_max_retry_attempts: Union[int, None], + retry_mode: Union[Literal["legacy", "standard", "adaptive"], None], ): global _aioboto_client if _aioboto_client is None: logger.debug("initializing the aiobotocore dynamodb client") + + retries = {} + if total_max_retry_attempts is not None: + retries["total_max_attempts"] = total_max_retry_attempts + if retry_mode is not None: + retries["mode"] = retry_mode + client_context = _get_aioboto_session().create_client( "dynamodb", region_name=region, @@ -602,7 +604,7 @@ async def _get_aiodynamodb_client( max_pool_connections=max_pool_connections, connect_timeout=connect_timeout, read_timeout=read_timeout, - retries=retries.model_dump(exclude_none=True), + retries=retries, connector_args={"keepalive_timeout": keepalive_timeout}, ), ) From 3787fef59841f81eca7d4be505de5ec9fcecf9d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Fri, 21 Mar 2025 23:00:49 +0100 Subject: [PATCH 4/7] Add explicity type hint to retries dict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- sdk/python/feast/infra/online_stores/dynamodb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index a96006ebc0b..d38bb88dc38 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -591,7 +591,7 @@ async def _get_aiodynamodb_client( if _aioboto_client is None: logger.debug("initializing the aiobotocore dynamodb client") - retries = {} + retries: Dict[str, Any] = {} if total_max_retry_attempts is not None: retries["total_max_attempts"] = total_max_retry_attempts if retry_mode is not None: From 88d96194c7ffa5c6289e47004917502bb2069d78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Fri, 21 Mar 2025 23:13:57 +0100 Subject: [PATCH 5/7] Update docstrings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- sdk/python/feast/infra/online_stores/dynamodb.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index d38bb88dc38..314bdcf9b21 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -96,13 +96,16 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): total_max_retry_attempts: Union[int, None] = None """Maximum number of total attempts that will be made on a single request. - Cf. + Maps to `retries.total_max_attempts in botocore.config.Config. Cf. https://github.com/boto/botocore/blob/dd8406d5fa1df18037d1dd2977aec47334f7e3ce/botocore/args.py#L558 - as for why `max_attempts` is not exposed here. + as for why `retries.max_attempts` is not exposed here. """ retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None - """The type of retry mode (aio)botocore should use.""" + """The type of retry mode (aio)botocore should use. + + Maps to `retries.mode in botocore.config.Config. + """ class DynamoDBOnlineStore(OnlineStore): From 05f9eb239adef1f5694c2b4843e4ed3a0a3cbab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Fri, 21 Mar 2025 23:15:17 +0100 Subject: [PATCH 6/7] Fix docstrings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- sdk/python/feast/infra/online_stores/dynamodb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 314bdcf9b21..0dbf0cfcac0 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -96,7 +96,7 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): total_max_retry_attempts: Union[int, None] = None """Maximum number of total attempts that will be made on a single request. - Maps to `retries.total_max_attempts in botocore.config.Config. Cf. + Maps to `retries.total_max_attempts` in botocore.config.Config. Cf. https://github.com/boto/botocore/blob/dd8406d5fa1df18037d1dd2977aec47334f7e3ce/botocore/args.py#L558 as for why `retries.max_attempts` is not exposed here. """ @@ -104,7 +104,7 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None """The type of retry mode (aio)botocore should use. - Maps to `retries.mode in botocore.config.Config. + Maps to `retries.mode` in botocore.config.Config. """ From 5c844b78cf88348d0e699a5035688d8fbcc0499b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20J=C3=A4ger?= Date: Mon, 24 Mar 2025 12:15:20 +0100 Subject: [PATCH 7/7] Update doc string, use None as retries default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sebastian Jäger --- sdk/python/feast/infra/online_stores/dynamodb.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 0dbf0cfcac0..edf4141b5be 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -96,9 +96,7 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): total_max_retry_attempts: Union[int, None] = None """Maximum number of total attempts that will be made on a single request. - Maps to `retries.total_max_attempts` in botocore.config.Config. Cf. - https://github.com/boto/botocore/blob/dd8406d5fa1df18037d1dd2977aec47334f7e3ce/botocore/args.py#L558 - as for why `retries.max_attempts` is not exposed here. + Maps to `retries.total_max_attempts` in botocore.config.Config. """ retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None @@ -607,7 +605,7 @@ async def _get_aiodynamodb_client( max_pool_connections=max_pool_connections, connect_timeout=connect_timeout, read_timeout=read_timeout, - retries=retries, + retries=retries if retries else None, connector_args={"keepalive_timeout": keepalive_timeout}, ), )