Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,6 @@ Desktop.ini

# AgentReady reports
.agentready/

# Claude Code project settings
.claude/
3 changes: 3 additions & 0 deletions protos/feast/serving/GrpcServer.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "feast/serving/ServingService.proto";
import "feast/types/Value.proto";

option java_package = "feast.proto.serving";
option java_outer_classname = "GrpcServerAPIProto";
Expand All @@ -11,6 +12,7 @@ message PushRequest {
string stream_feature_view = 2;
bool allow_registry_cache = 3;
string to = 4;
map<string, feast.types.Value> typed_features = 5;
}

message PushResponse {
Expand All @@ -21,6 +23,7 @@ message WriteToOnlineStoreRequest {
map<string, string> features = 1;
string feature_view_name = 2;
bool allow_registry_cache = 3;
map<string, feast.types.Value> typed_features = 4;
}

message WriteToOnlineStoreResponse {
Expand Down
46 changes: 41 additions & 5 deletions sdk/python/feast/infra/contrib/grpc_server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import threading
from collections.abc import Mapping
from concurrent import futures
from typing import Optional, Union

import grpc
import pandas as pd
from grpc_health.v1 import health, health_pb2_grpc

from feast.data_source import PushMode
from feast.errors import FeatureServiceNotFoundException, PushSourceNotFoundException
Expand Down Expand Up @@ -34,6 +34,20 @@ def parse(features):
return pd.DataFrame.from_dict(df)


def parse_typed(typed_features):
df = {}
for key, value in typed_features.items():
val_case = value.WhichOneof("val")
if val_case is None or val_case == "null_val":
df[key] = [None]
else:
raw = getattr(value, val_case)
if hasattr(raw, "val"):
raw = dict(raw.val) if isinstance(raw.val, Mapping) else list(raw.val)
df[key] = [raw]
return pd.DataFrame.from_dict(df)


class GrpcFeatureServer(GrpcFeatureServerServicer):
fs: FeatureStore

Expand All @@ -49,7 +63,17 @@ def __init__(self, fs: FeatureStore, registry_ttl_sec: int = 5):

def Push(self, request, context):
try:
df = parse(request.features)
if request.features and request.typed_features:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(
"Only one of features or typed_features may be set, not both"
)
return PushResponse(status=False)
df = (
parse_typed(request.typed_features)
if request.typed_features
else parse(request.features)
)
if request.to == "offline":
to = PushMode.OFFLINE
elif request.to == "online":
Expand All @@ -62,7 +86,7 @@ def Push(self, request, context):
f"'online_and_offline']."
)
self.fs.push(
push_source_name=request.push_source_name,
push_source_name=request.stream_feature_view,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
Expand All @@ -84,7 +108,17 @@ def WriteToOnlineStore(self, request, context):
"write_to_online_store is deprecated. Please consider using Push instead"
)
try:
df = parse(request.features)
if request.features and request.typed_features:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(
"Only one of features or typed_features may be set, not both"
)
return WriteToOnlineStoreResponse(status=False)
df = (
parse_typed(request.typed_features)
if request.typed_features
else parse(request.features)
)
self.fs.write_to_online_store(
feature_view_name=request.feature_view_name,
df=df,
Expand All @@ -94,7 +128,7 @@ def WriteToOnlineStore(self, request, context):
logger.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return WriteToOnlineStoreResponse(status=False)
return WriteToOnlineStoreResponse(status=True)

def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context):
Expand Down Expand Up @@ -136,6 +170,8 @@ def get_grpc_server(
max_workers: int,
registry_ttl_sec: int,
):
from grpc_health.v1 import health, health_pb2_grpc

logger.info(f"Initializing gRPC server on {address}")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
add_GrpcFeatureServerServicer_to_server(
Expand Down
39 changes: 24 additions & 15 deletions sdk/python/feast/protos/feast/serving/GrpcServer_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 45 additions & 2 deletions sdk/python/feast/protos/feast/serving/GrpcServer_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ isort:skip_file
"""
import builtins
import collections.abc
import feast.protos.feast.types.Value_pb2
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.message
Expand Down Expand Up @@ -34,24 +35,45 @@ class PushRequest(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

class TypedFeaturesEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
@property
def value(self) -> feast.protos.feast.types.Value_pb2.Value: ...
def __init__(
self,
*,
key: builtins.str = ...,
value: feast.protos.feast.types.Value_pb2.Value | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

FEATURES_FIELD_NUMBER: builtins.int
STREAM_FEATURE_VIEW_FIELD_NUMBER: builtins.int
ALLOW_REGISTRY_CACHE_FIELD_NUMBER: builtins.int
TO_FIELD_NUMBER: builtins.int
TYPED_FEATURES_FIELD_NUMBER: builtins.int
@property
def features(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ...
stream_feature_view: builtins.str
allow_registry_cache: builtins.bool
to: builtins.str
@property
def typed_features(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, feast.protos.feast.types.Value_pb2.Value]: ...
def __init__(
self,
*,
features: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
stream_feature_view: builtins.str = ...,
allow_registry_cache: builtins.bool = ...,
to: builtins.str = ...,
typed_features: collections.abc.Mapping[builtins.str, feast.protos.feast.types.Value_pb2.Value] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "features", b"features", "stream_feature_view", b"stream_feature_view", "to", b"to"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "features", b"features", "stream_feature_view", b"stream_feature_view", "to", b"to", "typed_features", b"typed_features"]) -> None: ...

global___PushRequest = PushRequest

Expand Down Expand Up @@ -87,21 +109,42 @@ class WriteToOnlineStoreRequest(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

class TypedFeaturesEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
@property
def value(self) -> feast.protos.feast.types.Value_pb2.Value: ...
def __init__(
self,
*,
key: builtins.str = ...,
value: feast.protos.feast.types.Value_pb2.Value | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

FEATURES_FIELD_NUMBER: builtins.int
FEATURE_VIEW_NAME_FIELD_NUMBER: builtins.int
ALLOW_REGISTRY_CACHE_FIELD_NUMBER: builtins.int
TYPED_FEATURES_FIELD_NUMBER: builtins.int
@property
def features(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ...
feature_view_name: builtins.str
allow_registry_cache: builtins.bool
@property
def typed_features(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, feast.protos.feast.types.Value_pb2.Value]: ...
def __init__(
self,
*,
features: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
feature_view_name: builtins.str = ...,
allow_registry_cache: builtins.bool = ...,
typed_features: collections.abc.Mapping[builtins.str, feast.protos.feast.types.Value_pb2.Value] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "feature_view_name", b"feature_view_name", "features", b"features"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["allow_registry_cache", b"allow_registry_cache", "feature_view_name", b"feature_view_name", "features", b"features", "typed_features", b"typed_features"]) -> None: ...

global___WriteToOnlineStoreRequest = WriteToOnlineStoreRequest

Expand Down
Loading
Loading