From ad98954833ddb488d3715344a48d0ac115ff864f Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 30 Oct 2024 17:18:38 -0400 Subject: [PATCH 1/3] use pydantic models to populate fastapi docs Signed-off-by: Rob Howley --- sdk/python/feast/feature_server.py | 64 +++++++++++++++--------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 0502c2a85d5..16b69a6bbe1 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,10 +1,9 @@ -import json import sys import threading import time import traceback from contextlib import asynccontextmanager -from typing import List, Optional +from typing import Any, Dict, List, Optional import pandas as pd import psutil @@ -69,6 +68,13 @@ class MaterializeIncrementalRequest(BaseModel): feature_views: Optional[List[str]] = None +class GetOnlineFeaturesRequest(BaseModel): + entities: Dict[str, List[Any]] + feature_service: Optional[str] = None + features: Optional[List[str]] = None + full_feature_names: bool = False + + def get_app( store: "feast.FeatureStore", registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL, @@ -108,33 +114,26 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) - async def get_body(request: Request): - return await request.body() - @app.post( "/get-online-features", dependencies=[Depends(inject_user_details)], ) - async def get_online_features(body=Depends(get_body)): - body = json.loads(body) - full_feature_names = body.get("full_feature_names", False) - entity_rows = body["entities"] + async def get_online_features(request: GetOnlineFeaturesRequest): # Initialize parameters for FeatureStore.get_online_features(...) call - if "feature_service" in body: + if request.feature_service: feature_service = store.get_feature_service( - body["feature_service"], allow_cache=True + request.feature_service, allow_cache=True ) assert_permissions( resource=feature_service, actions=[AuthzedAction.READ_ONLINE] ) - features = feature_service + features = request.feature_service # type: ignore else: - features = body["features"] all_feature_views, all_on_demand_feature_views = ( utils._get_feature_views_to_use( store.registry, store.project, - features, + request.features, allow_cache=True, hide_dummy_entity=False, ) @@ -147,18 +146,19 @@ async def get_online_features(body=Depends(get_body)): assert_permissions( resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE] ) + features = request.features # type: ignore read_params = dict( features=features, - entity_rows=entity_rows, - full_feature_names=full_feature_names, + entity_rows=request.entities, + full_feature_names=request.full_feature_names, ) if store._get_provider().async_supported.online.read: - response = await store.get_online_features_async(**read_params) + response = await store.get_online_features_async(**read_params) # type: ignore else: response = await run_in_threadpool( - lambda: store.get_online_features(**read_params) + lambda: store.get_online_features(**read_params) # type: ignore ) # Convert the Protobuf object to JSON and return it @@ -167,8 +167,7 @@ async def get_online_features(body=Depends(get_body)): ) @app.post("/push", dependencies=[Depends(inject_user_details)]) - async def push(body=Depends(get_body)): - request = PushFeaturesRequest(**json.loads(body)) + async def push(request: PushFeaturesRequest): df = pd.DataFrame(request.df) actions = [] if request.to == "offline": @@ -220,17 +219,16 @@ async def push(body=Depends(get_body)): store.push(**push_params) @app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)]) - def write_to_online_store(body=Depends(get_body)): - request = WriteToFeatureStoreRequest(**json.loads(body)) + def write_to_online_store(request: WriteToFeatureStoreRequest): df = pd.DataFrame(request.df) feature_view_name = request.feature_view_name allow_registry_cache = request.allow_registry_cache try: - feature_view = store.get_stream_feature_view( + feature_view = store.get_stream_feature_view( # type: ignore feature_view_name, allow_registry_cache=allow_registry_cache ) except FeatureViewNotFoundException: - feature_view = store.get_feature_view( + feature_view = store.get_feature_view( # type: ignore feature_view_name, allow_registry_cache=allow_registry_cache ) @@ -250,11 +248,12 @@ async def health(): ) @app.post("/materialize", dependencies=[Depends(inject_user_details)]) - def materialize(body=Depends(get_body)): - request = MaterializeRequest(**json.loads(body)) - for feature_view in request.feature_views: + def materialize(request: MaterializeRequest): + for feature_view in request.feature_views or []: + # TODO: receives a str for resource but isn't in the Union. is str actually allowed? assert_permissions( - resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE] + resource=feature_view, # type: ignore + actions=[AuthzedAction.WRITE_ONLINE], ) store.materialize( utils.make_tzaware(parser.parse(request.start_ts)), @@ -263,11 +262,12 @@ def materialize(body=Depends(get_body)): ) @app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)]) - def materialize_incremental(body=Depends(get_body)): - request = MaterializeIncrementalRequest(**json.loads(body)) - for feature_view in request.feature_views: + def materialize_incremental(request: MaterializeIncrementalRequest): + for feature_view in request.feature_views or []: + # TODO: receives a str for resource but isn't in the Union. is str actually allowed? assert_permissions( - resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE] + resource=feature_view, # type: ignore + actions=[AuthzedAction.WRITE_ONLINE], ) store.materialize_incremental( utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views From 1981a870666d828c8703af2cf51f45b0541fedfb Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 30 Oct 2024 17:46:26 -0400 Subject: [PATCH 2/3] fix feature service feature assignment Signed-off-by: Rob Howley --- sdk/python/feast/feature_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 16b69a6bbe1..0a33827e2fa 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -127,7 +127,7 @@ async def get_online_features(request: GetOnlineFeaturesRequest): assert_permissions( resource=feature_service, actions=[AuthzedAction.READ_ONLINE] ) - features = request.feature_service # type: ignore + features = feature_service # type: ignore else: all_feature_views, all_on_demand_feature_views = ( utils._get_feature_views_to_use( From 720b6bf2fdd135a4fc5d8ed46c57c236c8270b49 Mon Sep 17 00:00:00 2001 From: Rob Howley Date: Wed, 30 Oct 2024 17:53:00 -0400 Subject: [PATCH 3/3] add response types, mostly none Signed-off-by: Rob Howley --- sdk/python/feast/feature_server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 0a33827e2fa..5fc8129ad86 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -118,7 +118,7 @@ async def lifespan(app: FastAPI): "/get-online-features", dependencies=[Depends(inject_user_details)], ) - async def get_online_features(request: GetOnlineFeaturesRequest): + async def get_online_features(request: GetOnlineFeaturesRequest) -> Dict[str, Any]: # Initialize parameters for FeatureStore.get_online_features(...) call if request.feature_service: feature_service = store.get_feature_service( @@ -167,7 +167,7 @@ async def get_online_features(request: GetOnlineFeaturesRequest): ) @app.post("/push", dependencies=[Depends(inject_user_details)]) - async def push(request: PushFeaturesRequest): + async def push(request: PushFeaturesRequest) -> None: df = pd.DataFrame(request.df) actions = [] if request.to == "offline": @@ -219,7 +219,7 @@ async def push(request: PushFeaturesRequest): store.push(**push_params) @app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)]) - def write_to_online_store(request: WriteToFeatureStoreRequest): + def write_to_online_store(request: WriteToFeatureStoreRequest) -> None: df = pd.DataFrame(request.df) feature_view_name = request.feature_view_name allow_registry_cache = request.allow_registry_cache @@ -248,7 +248,7 @@ async def health(): ) @app.post("/materialize", dependencies=[Depends(inject_user_details)]) - def materialize(request: MaterializeRequest): + def materialize(request: MaterializeRequest) -> None: for feature_view in request.feature_views or []: # TODO: receives a str for resource but isn't in the Union. is str actually allowed? assert_permissions( @@ -262,7 +262,7 @@ def materialize(request: MaterializeRequest): ) @app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)]) - def materialize_incremental(request: MaterializeIncrementalRequest): + def materialize_incremental(request: MaterializeIncrementalRequest) -> None: for feature_view in request.feature_views or []: # TODO: receives a str for resource but isn't in the Union. is str actually allowed? assert_permissions(