From 5b33eecc23e3365f7d493b049cd094c4d4570bd7 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Mon, 9 Sep 2024 11:09:33 +0900 Subject: [PATCH 1/2] fix: Disable active_timer When registry_ttl_sec is 0 Signed-off-by: Jiwon Park --- sdk/python/feast/feature_server.py | 35 ++++++++++++++++-------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 4f8de1eef59..5072a323e5d 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -69,8 +69,8 @@ class MaterializeIncrementalRequest(BaseModel): def get_app( - store: "feast.FeatureStore", - registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL, + store: "feast.FeatureStore", + registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL, ): proto_json.patch() # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down @@ -90,9 +90,11 @@ def async_refresh(): registry_proto = store.registry.proto() if shutting_down: return - nonlocal active_timer - active_timer = threading.Timer(registry_ttl_sec, async_refresh) - active_timer.start() + + if registry_ttl_sec: + nonlocal active_timer + active_timer = threading.Timer(registry_ttl_sec, async_refresh) + active_timer.start() @asynccontextmanager async def lifespan(app: FastAPI): @@ -181,9 +183,9 @@ def push(body=Depends(get_body)): fv for fv in all_fvs if ( - fv.stream_source is not None - and isinstance(fv.stream_source, PushSource) - and fv.stream_source.name == request.push_source_name + fv.stream_source is not None + and isinstance(fv.stream_source, PushSource) + and fv.stream_source.name == request.push_source_name ) } @@ -269,6 +271,7 @@ async def rest_exception_handler(request: Request, exc: Exception): if sys.platform != "win32": import gunicorn.app.base + class FeastServeApplication(gunicorn.app.base.BaseApplication): def __init__(self, store: "feast.FeatureStore", **options): self._app = get_app( @@ -306,14 +309,14 @@ def monitor_resources(self, interval: int = 5): def start_server( - store: "feast.FeatureStore", - host: str, - port: int, - no_access_log: bool, - workers: int, - keep_alive_timeout: int, - registry_ttl_sec: int, - metrics: bool, + store: "feast.FeatureStore", + host: str, + port: int, + no_access_log: bool, + workers: int, + keep_alive_timeout: int, + registry_ttl_sec: int, + metrics: bool, ): if metrics: logger.info("Starting Prometheus Server") From 4f94051410d41f891276ce87014c6c68ea473bfc Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Mon, 9 Sep 2024 15:16:31 +0900 Subject: [PATCH 2/2] feat: Add delete mark Signed-off-by: Jiwon Park --- sdk/python/feast/feature_server.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 5072a323e5d..9757e95143e 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -69,8 +69,8 @@ class MaterializeIncrementalRequest(BaseModel): def get_app( - store: "feast.FeatureStore", - registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL, + store: "feast.FeatureStore", + registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL, ): proto_json.patch() # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down @@ -183,9 +183,9 @@ def push(body=Depends(get_body)): fv for fv in all_fvs if ( - fv.stream_source is not None - and isinstance(fv.stream_source, PushSource) - and fv.stream_source.name == request.push_source_name + fv.stream_source is not None + and isinstance(fv.stream_source, PushSource) + and fv.stream_source.name == request.push_source_name ) } @@ -271,7 +271,6 @@ async def rest_exception_handler(request: Request, exc: Exception): if sys.platform != "win32": import gunicorn.app.base - class FeastServeApplication(gunicorn.app.base.BaseApplication): def __init__(self, store: "feast.FeatureStore", **options): self._app = get_app( @@ -309,14 +308,14 @@ def monitor_resources(self, interval: int = 5): def start_server( - store: "feast.FeatureStore", - host: str, - port: int, - no_access_log: bool, - workers: int, - keep_alive_timeout: int, - registry_ttl_sec: int, - metrics: bool, + store: "feast.FeatureStore", + host: str, + port: int, + no_access_log: bool, + workers: int, + keep_alive_timeout: int, + registry_ttl_sec: int, + metrics: bool, ): if metrics: logger.info("Starting Prometheus Server")