From 3b37b8a8fb07fde3efc77874827828e2400d8661 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Fri, 11 Oct 2024 18:09:45 -0400 Subject: [PATCH 1/4] put all dynamo calls in an asyncio.gather Signed-off-by: Rob Howley --- .../feast/infra/online_stores/dynamodb.py | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index c0494272b34..1422d8f385d 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import itertools import logging from datetime import datetime @@ -297,36 +298,49 @@ async def online_read_async( batch_size = online_config.batch_size entity_ids = self._to_entity_ids(config, entity_keys) entity_ids_iter = iter(entity_ids) - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] table_name = _get_table_name(online_config, config, table) deserialize = TypeDeserializer().deserialize - def to_tbl_resp(raw_client_response): - return { - "entity_id": deserialize(raw_client_response["entity_id"]), - "event_ts": deserialize(raw_client_response["event_ts"]), - "values": deserialize(raw_client_response["values"]), - } + entity_id_batches = [] + while True: + batch = list(itertools.islice(entity_ids_iter, batch_size)) + if not batch: + break + entity_id_batch = self._to_client_batch_get_payload( + online_config, table_name, batch + ) + entity_id_batches.append(entity_id_batch) async with self._get_aiodynamodb_client(online_config.region) as client: - while True: - batch = list(itertools.islice(entity_ids_iter, batch_size)) - - # No more items to insert - if len(batch) == 0: - break - batch_entity_ids = self._to_client_batch_get_payload( - online_config, table_name, batch - ) + + async def get_and_format(entity_id_batch): + def to_tbl_resp(raw_client_response): + return { + "entity_id": deserialize(raw_client_response["entity_id"]), + "event_ts": deserialize(raw_client_response["event_ts"]), + "values": deserialize(raw_client_response["values"]), + } + response = await client.batch_get_item( - RequestItems=batch_entity_ids, + RequestItems=entity_id_batch, ) - batch_result = self._process_batch_get_response( - table_name, response, entity_ids, batch, to_tbl_response=to_tbl_resp + return self._process_batch_get_response( + table_name, + response, + entity_ids, + batch, + to_tbl_response=to_tbl_resp, ) - result.extend(batch_result) - return result + + result_batches = await asyncio.gather( + *[ + get_and_format(entity_id_batch) + for entity_id_batch in entity_id_batches + ] + ) + + return list(itertools.chain(*result_batches)) def _get_aioboto_session(self): if self._aioboto_session is None: From b3b7ed1dd3ab7d252985c875ebcdcb4776bd9fc8 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Fri, 11 Oct 2024 19:52:50 -0400 Subject: [PATCH 2/4] fix: indexing Signed-off-by: Rob Howley --- .../feast/infra/online_stores/dynamodb.py | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 1422d8f385d..a6cbfb41d2b 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -302,6 +302,14 @@ async def online_read_async( deserialize = TypeDeserializer().deserialize + def to_tbl_resp(raw_client_response): + return { + "entity_id": deserialize(raw_client_response["entity_id"]), + "event_ts": deserialize(raw_client_response["event_ts"]), + "values": deserialize(raw_client_response["values"]), + } + + batches = [] entity_id_batches = [] while True: batch = list(itertools.islice(entity_ids_iter, batch_size)) @@ -310,36 +318,30 @@ async def online_read_async( entity_id_batch = self._to_client_batch_get_payload( online_config, table_name, batch ) + batches.append(batch) entity_id_batches.append(entity_id_batch) async with self._get_aiodynamodb_client(online_config.region) as client: - - async def get_and_format(entity_id_batch): - def to_tbl_resp(raw_client_response): - return { - "entity_id": deserialize(raw_client_response["entity_id"]), - "event_ts": deserialize(raw_client_response["event_ts"]), - "values": deserialize(raw_client_response["values"]), - } - - response = await client.batch_get_item( - RequestItems=entity_id_batch, - ) - return self._process_batch_get_response( - table_name, - response, - entity_ids, - batch, - to_tbl_response=to_tbl_resp, - ) - - result_batches = await asyncio.gather( + response_batches = await asyncio.gather( *[ - get_and_format(entity_id_batch) + client.batch_get_item( + RequestItems=entity_id_batch, + ) for entity_id_batch in entity_id_batches ] ) + result_batches = [] + for batch, response in zip(batches, response_batches): + result_batch = self._process_batch_get_response( + table_name, + response, + entity_ids, + batch, + to_tbl_response=to_tbl_resp, + ) + result_batches.append(result_batch) + return list(itertools.chain(*result_batches)) def _get_aioboto_session(self): From 3a302840f81ac36098cb67fd00f8f8ede475c105 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Fri, 11 Oct 2024 20:06:00 -0400 Subject: [PATCH 3/4] parallelize the per table lookups Signed-off-by: Rob Howley --- sdk/python/feast/infra/online_stores/online_store.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index ea86bd9175a..95b07df1b21 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import asyncio from abc import ABC, abstractmethod from datetime import datetime from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union @@ -240,7 +240,7 @@ async def get_online_features_async( native_entity_values=True, ) - for table, requested_features in grouped_refs: + async def query_table(table, requested_features): # Get the correct set of entity values with the correct join keys. table_entity_values, idxs = utils._get_unique_entities( table, @@ -258,6 +258,14 @@ async def get_online_features_async( requested_features=requested_features, ) + return idxs, read_rows + + all_responses = await asyncio.gather(*[ + query_table(table, requested_features) + for table, requested_features in grouped_refs + ]) + + for (idxs, read_rows), (table, requested_features) in zip(all_responses, grouped_refs): feature_data = utils._convert_rows_to_protobuf( requested_features, read_rows ) From e6fcd78864321b6f8f43684660c9988bfae91846 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Fri, 11 Oct 2024 20:07:00 -0400 Subject: [PATCH 4/4] ruff Signed-off-by: Rob Howley --- .../feast/infra/online_stores/online_store.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 95b07df1b21..cf2d68eb746 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -260,12 +260,16 @@ async def query_table(table, requested_features): return idxs, read_rows - all_responses = await asyncio.gather(*[ - query_table(table, requested_features) - for table, requested_features in grouped_refs - ]) + all_responses = await asyncio.gather( + *[ + query_table(table, requested_features) + for table, requested_features in grouped_refs + ] + ) - for (idxs, read_rows), (table, requested_features) in zip(all_responses, grouped_refs): + for (idxs, read_rows), (table, requested_features) in zip( + all_responses, grouped_refs + ): feature_data = utils._convert_rows_to_protobuf( requested_features, read_rows )