From 6e62fcf064bf2c68cfab5e735d7662f48defe783 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Mon, 12 Jan 2026 15:25:33 +0100 Subject: [PATCH] Update lambda materialization engine Signed-off-by: hkuepers --- .../infra/compute_engines/aws_lambda/app.py | 119 ++++++++++-------- .../aws_lambda/lambda_engine.py | 9 +- 2 files changed, 71 insertions(+), 57 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/aws_lambda/app.py b/sdk/python/feast/infra/compute_engines/aws_lambda/app.py index 2bf65542e55..98760651dc0 100644 --- a/sdk/python/feast/infra/compute_engines/aws_lambda/app.py +++ b/sdk/python/feast/infra/compute_engines/aws_lambda/app.py @@ -1,26 +1,31 @@ import base64 -import json -import sys +import logging import tempfile -import traceback from pathlib import Path import pyarrow.parquet as pq from feast import FeatureStore from feast.constants import FEATURE_STORE_YAML_ENV_NAME -from feast.infra.materialization.local_engine import DEFAULT_BATCH_SIZE +from feast.infra.compute_engines.aws_lambda.lambda_engine import DEFAULT_BATCH_SIZE from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping +logger = logging.getLogger() +logger.setLevel("INFO") -def handler(event, context): - """Provide an event that contains the following keys: - - operation: one of the operations in the operations dict below - - tableName: required for operations that interact with DynamoDB - - payload: a parameter to pass to the operation being performed +def handler(event, context): + """Load a parquet file and write the feature values to the online store. + + Args: + event (dict): payload containing the following keys: + FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config + view_name: Name of FeatureView to be materialized + view_type: Type of FeatureView + path: Path to parquet batch file on S3 bucket + context (dict): Lambda runtime context, not used. """ - print("Received event: " + json.dumps(event, indent=2), flush=True) + logger.info(f"Received event: {event}") try: config_base64 = event[FEATURE_STORE_YAML_ENV_NAME] @@ -28,57 +33,61 @@ def handler(event, context): config_bytes = base64.b64decode(config_base64) # Create a new unique directory for writing feature_store.yaml - repo_path = Path(tempfile.mkdtemp()) - - with open(repo_path / "feature_store.yaml", "wb") as f: - f.write(config_bytes) + with tempfile.TemporaryDirectory() as repo_posix_path: + repo_path = Path(repo_posix_path) - # Initialize the feature store - store = FeatureStore(repo_path=str(repo_path.resolve())) + with open(repo_path / "feature_store.yaml", "wb") as f: + f.write(config_bytes) - view_name = event["view_name"] - view_type = event["view_type"] - path = event["path"] + # Initialize the feature store + store = FeatureStore(repo_path=str(repo_path.resolve())) - bucket = path[len("s3://") :].split("/", 1)[0] - key = path[len("s3://") :].split("/", 1)[1] - print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True) + view_name = event["view_name"] + view_type = event["view_type"] + path = event["path"] - if view_type == "batch": - # TODO: This probably needs to be become `store.get_batch_feature_view` at some point. - feature_view = store.get_feature_view(view_name) - else: - feature_view = store.get_stream_feature_view(view_name) + bucket, key = path[len("s3://") :].split("/", 1) + logger.info(f"Inferred Bucket: `{bucket}` Key: `{key}`") - print(f"Got Feature View: `{feature_view}`", flush=True) + if view_type == "batch": + # TODO: This probably needs to be become `store.get_batch_feature_view` at some point. # noqa: E501,W505 + feature_view = store.get_feature_view(view_name) + else: + feature_view = store.get_stream_feature_view(view_name) - table = pq.read_table(path) - if feature_view.batch_source.field_mapping is not None: - table = _run_pyarrow_field_mapping( - table, feature_view.batch_source.field_mapping + logger.info( + f"Got Feature View: `{feature_view.name}`, \ + last updated: {feature_view.last_updated_timestamp}" ) - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } - - written_rows = 0 - - for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto( - batch, feature_view, join_key_to_value_type - ) - store._provider.online_write_batch( - store.config, - feature_view, - rows_to_write, - lambda x: None, + table = pq.read_table(path) + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + written_rows = 0 + + for batch in table.to_batches(DEFAULT_BATCH_SIZE): + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) + store._provider.online_write_batch( + store.config, + feature_view, + rows_to_write, + lambda x: None, + ) + written_rows += len(rows_to_write) + logger.info( + f"Successfully updated {written_rows} rows.", + extra={"num_updated_rows": written_rows, "feature_view": view_name}, ) - written_rows += len(rows_to_write) - return {"written_rows": written_rows} - except Exception as e: - print(f"Exception: {e}", flush=True) - print("Traceback:", flush=True) - print(traceback.format_exc(), flush=True) - sys.exit(1) + except Exception: + logger.exception("Error in processing materialization.") + raise diff --git a/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py b/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py index cc32e5b74b3..9e2d217875b 100644 --- a/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py +++ b/sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py @@ -108,9 +108,14 @@ def update( r = self.lambda_client.create_function( FunctionName=self.lambda_name, PackageType="Image", - Role=self.repo_config.batch_engine.lambda_role, - Code={"ImageUri": self.repo_config.batch_engine.materialization_image}, + Role=self.repo_config.batch_engine_config.lambda_role, + Code={ + "ImageUri": self.repo_config.batch_engine_config.materialization_image + }, Timeout=DEFAULT_TIMEOUT, + LoggingConfig={ + "LogFormat": "JSON", + }, Tags={ "feast-owned": "True", "project": project,