Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remote retrive online doc v2
Signed-off-by: jyejare <jyejare@redhat.com>
  • Loading branch information
jyejare committed Jul 24, 2025
commit 2043b09d4216499837d879ae31eb170974987d67
20 changes: 14 additions & 6 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ class GetOnlineDocumentsRequest(BaseModel):
features: Optional[List[str]] = None
full_feature_names: bool = False
top_k: Optional[int] = None
query_embedding: Optional[List[float]] = None
query: Optional[List[float]] = None
query_string: Optional[str] = None
api_version: Optional[int] = 1


class ChatMessage(BaseModel):
Expand Down Expand Up @@ -266,13 +267,20 @@ async def retrieve_online_documents(

read_params = dict(
features=features,
query=request.query_embedding,
top_k=request.top_k,
query=request.query,
top_k=request.top_k
)
if request.api_version == 2 and request.query_string is not None:
read_params['query_string'] = request.query_string

response = await run_in_threadpool(
lambda: store.retrieve_online_documents(**read_params) # type: ignore
)
if request.api_version == 2:
response = await run_in_threadpool(
lambda: store.retrieve_online_documents_v2(**read_params) # type: ignore
)
else:
response = await run_in_threadpool(
lambda: store.retrieve_online_documents(**read_params) # type: ignore
)

# Convert the Protobuf object to JSON and return it
response_dict = await run_in_threadpool(
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ def retrieve_online_documents_v2(
distance_metric,
query_string,
)

def _retrieve_from_online_store(
self,
provider: Provider,
Expand Down Expand Up @@ -2413,6 +2413,12 @@ def _retrieve_from_online_store_v2(
output_len=output_len,
)

utils._populate_result_rows_from_columnar(
online_features_response=online_features_response,
data=entity_key_dict,
)


return OnlineResponse(online_features_response)

def serve(
Expand Down
95 changes: 39 additions & 56 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,60 +233,55 @@ def retrieve_online_documents_v2(
self,
config: RepoConfig,
table: FeatureView,
requested_features: Optional[List[str]],
embedding: Optional[List[float]],
top_k: int,
requested_features: Optional[List[str]] = None,
distance_metric: Optional[str] = None,
query_string: Optional[str] = None,
) -> List[Tuple[Optional[datetime], Optional[EntityKeyProto], Optional[Dict[str, ValueProto]]]]:
assert isinstance(config.online_store, RemoteOnlineStoreConfig)
config.online_store.__class__ = RemoteOnlineStoreConfig

req_body = self._construct_online_documents_v2_api_json_request(
table, requested_features, embedding, top_k, distance_metric, query_string
table, requested_features, embedding, top_k, distance_metric, query_string, api_version=2
)
response = get_remote_online_documents_v2(config=config, req_body=req_body)
response = get_remote_online_documents(config=config, req_body=req_body)
if response.status_code == 200:
logger.debug("Able to retrieve the online documents from feature server.")
response_json = json.loads(response.text)
event_ts = self._get_event_ts(response_json)
result_tuples: List[
Tuple[Optional[datetime], Optional[EntityKeyProto], Optional[Dict[str, ValueProto]]]
] = []
for feature_value_index in range(
len(response_json["results"][0]["values"])
):
feature_values_dict: Dict[str, ValueProto] = dict()
for index, feature_name in enumerate(
response_json["metadata"]["feature_names"]
):
if (
requested_features is not None
and feature_name in requested_features
):
if (
response_json["results"][index]["statuses"][
feature_value_index
]
== "PRESENT"
):
message = python_values_to_proto_values(
[
response_json["results"][index]["values"][
feature_value_index
]
],
ValueType.UNKNOWN,
)
feature_values_dict[feature_name] = message[0]
else:
feature_values_dict[feature_name] = ValueProto()

# Create feature name to index mapping for efficient lookup
feature_name_to_index = {
name: idx for idx, name in enumerate(response_json["metadata"]["feature_names"])
}

# Create a dummy EntityKeyProto since remote store doesn't provide entity information
# This matches the behavior of the current implementation
entity_key_proto = None
# Process each result row
num_results = len(response_json["results"][0]["values"]) if response_json["results"] else 0
result_tuples = []

for row_idx in range(num_results):
# Build feature values dictionary for requested features
feature_values_dict: Dict[str, ValueProto] = {}

if requested_features:
for feature_name in requested_features:
if feature_name in feature_name_to_index:
feature_idx = feature_name_to_index[feature_name]
if self._is_feature_present(response_json, feature_idx, row_idx):
feature_values_dict[feature_name] = self._extract_feature_value(
response_json, feature_idx, row_idx
)
else:
feature_values_dict[feature_name] = ValueProto()

# Construct entity key proto using existing helper method
entity_key_proto = self._construct_entity_key_from_response(
response_json, row_idx, feature_name_to_index
)

result_tuples.append((event_ts, entity_key_proto, feature_values_dict))

return result_tuples
else:
error_msg = f"Unable to retrieve the online documents using feature server API. Error_code={response.status_code}, error_message={response.text}"
Expand Down Expand Up @@ -396,7 +391,7 @@ def _construct_online_documents_api_json_request(
req_body = json.dumps(
{
"features": api_requested_features,
"query_embedding": embedding,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
}
Expand All @@ -406,11 +401,12 @@ def _construct_online_documents_api_json_request(
def _construct_online_documents_v2_api_json_request(
self,
table: FeatureView,
requested_features: Optional[List[str]],
embedding: Optional[List[float]],
top_k: int,
requested_features: Optional[List[str]] = None,
distance_metric: Optional[str] = None,
query_string: Optional[str] = None,
api_version: Optional[int] = 1,
) -> str:
api_requested_features = []
if requested_features is not None:
Expand All @@ -420,10 +416,11 @@ def _construct_online_documents_v2_api_json_request(
req_body = json.dumps(
{
"features": api_requested_features,
"embedding": embedding,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
"query_string": query_string,
"api_version": api_version,
}
)
return req_body
Expand Down Expand Up @@ -460,6 +457,8 @@ def _construct_entity_key_from_response(
def _extract_feature_value(self, response_json: dict, feature_idx: int, row_idx: int) -> ValueProto:
"""Extract and convert a feature value to ValueProto."""
raw_value = response_json["results"][feature_idx]["values"][row_idx]
if raw_value is None:
return ValueProto()
proto_values = python_values_to_proto_values([raw_value])
return proto_values[0]

Expand Down Expand Up @@ -515,22 +514,6 @@ def get_remote_online_documents(
)


@rest_error_handling_decorator
def get_remote_online_documents_v2(
session: requests.Session, config: RepoConfig, req_body: str
) -> requests.Response:
if config.online_store.cert:
return session.post(
f"{config.online_store.path}/retrieve-online-documents",
data=req_body,
verify=config.online_store.cert,
)
else:
return session.post(
f"{config.online_store.path}/retrieve-online-documents", data=req_body
)


@rest_error_handling_decorator
def post_remote_online_write(
session: requests.Session, config: RepoConfig, req_body: dict
Expand Down