From e93562cd34d454f2914e7adf0b24cbab26794ace Mon Sep 17 00:00:00 2001 From: snowron Date: Tue, 12 Sep 2023 14:44:16 +0300 Subject: [PATCH 1/2] resolve #3760 Signed-off-by: snowron --- sdk/python/feast/feature_server.py | 38 +++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 3abca1d6e8e..20fa70cd2ad 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,5 +1,6 @@ import json import traceback +from typing import List, Optional import warnings import gunicorn.app.base @@ -15,7 +16,8 @@ from feast.data_source import PushMode from feast.errors import PushSourceNotFoundException from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest - +from dateutil import parser +from feast import utils # TODO: deprecate this in favor of push features class WriteToFeatureStoreRequest(BaseModel): @@ -31,6 +33,16 @@ class PushFeaturesRequest(BaseModel): to: str = "online" +class MaterializeRequest(BaseModel): + start_ts: str + end_ts: str + feature_views: Optional[List[str]] = None + +class MaterializeIncrementalRequest(BaseModel): + end_ts: str + feature_views: Optional[List[str]] = None + + def get_app(store: "feast.FeatureStore"): proto_json.patch() @@ -134,6 +146,30 @@ def write_to_online_store(body=Depends(get_body)): def health(): return Response(status_code=status.HTTP_200_OK) + @app.post("/materialize") + def materialize(body=Depends(get_body)): + try: + request = MaterializeRequest(**json.loads(body)) + store.materialize(utils.make_tzaware(parser.parse(request.start_ts)), utils.make_tzaware( + parser.parse(request.end_ts)), request.feature_views) + except Exception as e: + # Print the original exception on the server side + logger.exception(traceback.format_exc()) + # Raise HTTPException to return the error message to the client + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/materialize-incremental") + def materialize_incremental(body=Depends(get_body)): + try: + request = MaterializeIncrementalRequest(**json.loads(body)) + store.materialize_incremental(utils.make_tzaware( + parser.parse(request.end_ts)), request.feature_views) + except Exception as e: + # Print the original exception on the server side + logger.exception(traceback.format_exc()) + # Raise HTTPException to return the error message to the client + raise HTTPException(status_code=500, detail=str(e)) + return app From 0dbf03a1e4a535def16f201b16e4c81d771ef5c5 Mon Sep 17 00:00:00 2001 From: snowron Date: Tue, 12 Sep 2023 21:21:29 +0300 Subject: [PATCH 2/2] format feature_server.py Signed-off-by: snowron --- sdk/python/feast/feature_server.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 20fa70cd2ad..7c638dd2481 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,10 +1,11 @@ import json import traceback -from typing import List, Optional import warnings +from typing import List, Optional import gunicorn.app.base import pandas as pd +from dateutil import parser from fastapi import FastAPI, HTTPException, Request, Response, status from fastapi.logger import logger from fastapi.params import Depends @@ -12,12 +13,11 @@ from pydantic import BaseModel import feast -from feast import proto_json +from feast import proto_json, utils from feast.data_source import PushMode from feast.errors import PushSourceNotFoundException from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest -from dateutil import parser -from feast import utils + # TODO: deprecate this in favor of push features class WriteToFeatureStoreRequest(BaseModel): @@ -38,6 +38,7 @@ class MaterializeRequest(BaseModel): end_ts: str feature_views: Optional[List[str]] = None + class MaterializeIncrementalRequest(BaseModel): end_ts: str feature_views: Optional[List[str]] = None @@ -150,8 +151,11 @@ def health(): def materialize(body=Depends(get_body)): try: request = MaterializeRequest(**json.loads(body)) - store.materialize(utils.make_tzaware(parser.parse(request.start_ts)), utils.make_tzaware( - parser.parse(request.end_ts)), request.feature_views) + store.materialize( + utils.make_tzaware(parser.parse(request.start_ts)), + utils.make_tzaware(parser.parse(request.end_ts)), + request.feature_views, + ) except Exception as e: # Print the original exception on the server side logger.exception(traceback.format_exc()) @@ -162,8 +166,9 @@ def materialize(body=Depends(get_body)): def materialize_incremental(body=Depends(get_body)): try: request = MaterializeIncrementalRequest(**json.loads(body)) - store.materialize_incremental(utils.make_tzaware( - parser.parse(request.end_ts)), request.feature_views) + store.materialize_incremental( + utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views + ) except Exception as e: # Print the original exception on the server side logger.exception(traceback.format_exc())