diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b930b2099d0..4820c401a6c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -3064,18 +3064,13 @@ def _retrieve_from_online_store_v2( entity_key_dict, ) - feature_data = utils._convert_rows_to_protobuf( - requested_features=features_to_request, - read_rows=list(zip(datevals, list_of_feature_dicts)), - ) - online_features_response = GetOnlineFeaturesResponse(results=[]) utils._populate_response_from_feature_data( - feature_data=feature_data, + requested_features=features_to_request, + read_rows=list(zip(datevals, list_of_feature_dicts)), indexes=idxs, online_features_response=online_features_response, full_feature_names=False, - requested_features=features_to_request, table=table, output_len=output_len, include_feature_view_version_metadata=include_feature_view_version_metadata, diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 4913046470c..5bdc2fa9e46 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -220,17 +220,12 @@ def get_online_features( requested_features=requested_features, ) - feature_data = utils._convert_rows_to_protobuf( - requested_features, read_rows - ) - - # Populate the result_rows with the Features from the OnlineStore inplace. utils._populate_response_from_feature_data( - feature_data, + requested_features, + read_rows, idxs, online_features_response, full_feature_names, - requested_features, table, output_len, include_feature_view_version_metadata, @@ -356,17 +351,12 @@ async def query_table(table, requested_features): for (idxs, read_rows, output_len), (table, requested_features) in zip( all_responses, grouped_refs ): - feature_data = utils._convert_rows_to_protobuf( - requested_features, read_rows - ) - - # Populate the result_rows with the Features from the OnlineStore inplace. utils._populate_response_from_feature_data( - feature_data, + requested_features, + read_rows, idxs, online_features_response, full_feature_names, - requested_features, table, output_len, include_feature_view_version_metadata, diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index ce45bb0862e..809b69814ac 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -666,30 +666,6 @@ def _group_feature_refs( return fvs_result, odfvs_result -def construct_response_feature_vector( - values_vector: Iterable[Any], - statuses_vector: Iterable[Any], - timestamp_vector: Iterable[Any], - mapping_indexes: Iterable[List[int]], - output_len: int, -) -> GetOnlineFeaturesResponse.FeatureVector: - values_output: Iterable[Any] = [None] * output_len - statuses_output: Iterable[Any] = [None] * output_len - timestamp_output: Iterable[Any] = [None] * output_len - - for i, destinations in enumerate(mapping_indexes): - for idx in destinations: - values_output[idx] = values_vector[i] # type: ignore[index] - statuses_output[idx] = statuses_vector[i] # type: ignore[index] - timestamp_output[idx] = timestamp_vector[i] # type: ignore[index] - - return GetOnlineFeaturesResponse.FeatureVector( - values=values_output, - statuses=statuses_output, - event_timestamps=timestamp_output, - ) - - def _apply_aggregations_to_response( response_data: Union[pyarrow.Table, Dict[str, List[Any]]], aggregations, @@ -1130,115 +1106,6 @@ def ensure_request_data_values_exist( raise RequestDataNotFoundInEntityRowsException(feature_names=missing_features) -def _populate_response_from_feature_data( - feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[ValueProto] - ] - ], - indexes: Iterable[List[int]], - online_features_response: GetOnlineFeaturesResponse, - full_feature_names: bool, - requested_features: Iterable[str], - table: "FeatureView", - output_len: int, - include_feature_view_version_metadata: bool = False, -): - """Populate the GetOnlineFeaturesResponse with feature data. - - This method assumes that `_read_from_online_store` returns data for each - combination of Entities in `entity_rows` in the same order as they - are provided. - - Args: - feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore. - indexes: A list of indexes which should be the same length as `feature_data`. Each list - of indexes corresponds to a set of result rows in `online_features_response`. - online_features_response: The object to populate. - full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, - changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to - "customer_fv__daily_transactions"). - requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the - data in `feature_data`. - table: The FeatureView that `feature_data` was retrieved from. - output_len: The number of result rows in `online_features_response`. - """ - # Add the feature names to the response. - # Use name_to_use() which includes version tag (e.g. "fv@v2") when a - # version-qualified ref was used, so multi-version queries produce - # distinct column names like "fv@v1__feat" and "fv@v2__feat". - table_name = table.projection.name_to_use() - clean_table_name = table.projection.name_alias or table.projection.name - requested_feature_refs = [ - f"{table_name}__{feature_name}" if full_feature_names else feature_name - for feature_name in requested_features - ] - online_features_response.metadata.feature_names.val.extend(requested_feature_refs) - - # Add version metadata if requested - if include_feature_view_version_metadata: - # Check if this feature view already exists in metadata to avoid duplicates - existing_names = [ - fvm.name for fvm in online_features_response.metadata.feature_view_metadata - ] - if clean_table_name not in existing_names: - fv_metadata = online_features_response.metadata.feature_view_metadata.add() - fv_metadata.name = clean_table_name - # Extract version from the table's current_version_number attribute - fv_metadata.version = getattr(table, "current_version_number", 0) or 0 - - # Process each feature vector in a single pass - for timestamp_vector, statuses_vector, values_vector in feature_data: - response_vector = construct_response_feature_vector( - values_vector, statuses_vector, timestamp_vector, indexes, output_len - ) - online_features_response.results.append(response_vector) - - -def _populate_response_from_feature_data_v2( - feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[ValueProto] - ] - ], - indexes: Iterable[List[int]], - online_features_response: GetOnlineFeaturesResponse, - requested_features: Iterable[str], - output_len: int, -): - """Populate the GetOnlineFeaturesResponse with feature data. - - This method assumes that `_read_from_online_store` returns data for each - combination of Entities in `entity_rows` in the same order as they - are provided. - - Args: - feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore. - indexes: A list of indexes which should be the same length as `feature_data`. Each list - of indexes corresponds to a set of result rows in `online_features_response`. - online_features_response: The object to populate. - full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, - changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to - "customer_fv__daily_transactions"). - requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the - data in `feature_data`. - output_len: The number of result rows in `online_features_response`. - """ - # Add the feature names to the response. - requested_feature_refs = [(feature_name) for feature_name in requested_features] - online_features_response.metadata.feature_names.val.extend(requested_feature_refs) - - timestamps, statuses, values = zip(*feature_data) - - # Populate the result with data fetched from the OnlineStore - # which is guaranteed to be aligned with `requested_features`. - for timestamp_vector, statuses_vector, values_vector in feature_data: - response_vector = construct_response_feature_vector( - values_vector, statuses_vector, timestamp_vector, indexes, output_len - ) - online_features_response.results.append(response_vector) - - def _convert_entity_key_to_proto_to_dict( entity_key_vals: List[EntityKeyProto], ) -> Dict[str, List[ValueProto]]: @@ -1612,36 +1479,99 @@ def _get_entity_key_protos( return entity_key_protos -def _convert_rows_to_protobuf( +def _populate_response_from_feature_data( requested_features: List[str], read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]], -) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[ValueProto]]]: - n_rows = len(read_rows) + indexes: Iterable[List[int]], + online_features_response: GetOnlineFeaturesResponse, + full_feature_names: bool, + table: "FeatureView", + output_len: int, + include_feature_view_version_metadata: bool = False, +): + """Populate the GetOnlineFeaturesResponse from raw online_read rows. + + Converts raw rows from the OnlineStore into protobuf FeatureVectors and + appends them to the response. This method assumes that ``online_read`` + returns data for each unique entity in the same order as ``indexes``. + + Args: + requested_features: The names of the features to extract from + each row. Determines the order of FeatureVectors in the response. + read_rows: Raw output from ``OnlineStore.online_read`` — a list of + ``(event_timestamp, feature_dict)`` tuples, one per unique entity. + ``feature_dict`` may be ``None`` when the entity is not found. + indexes: A tuple of lists that maps each unique entity (by position + in ``read_rows``) to one or more output positions in the response. + Used to fan-out deduplicated reads back to the original request rows. + online_features_response: The protobuf response object to populate. + full_feature_names: If True, feature names are prefixed with the + feature view name (e.g. ``"driver_fv__trips_today"``). + table: The FeatureView that ``read_rows`` was retrieved from. + output_len: Total number of result rows in the response. + include_feature_view_version_metadata: If True, version metadata + for the feature view is added to the response. + """ + n_features = len(requested_features) + + table_name = table.projection.name_to_use() + clean_table_name = table.projection.name_alias or table.projection.name + feature_refs = [ + f"{table_name}__{fn}" if full_feature_names else fn for fn in requested_features + ] + online_features_response.metadata.feature_names.val.extend(feature_refs) + + if include_feature_view_version_metadata: + existing_names = [ + fvm.name for fvm in online_features_response.metadata.feature_view_metadata + ] + if clean_table_name not in existing_names: + fv_metadata = online_features_response.metadata.feature_view_metadata.add() + fv_metadata.name = clean_table_name + fv_metadata.version = getattr(table, "current_version_number", 0) or 0 null_value = ValueProto() - null_status = FieldStatus.NOT_FOUND - present_status = FieldStatus.PRESENT + null_ts = Timestamp() + PRESENT = FieldStatus.PRESENT + NOT_FOUND = FieldStatus.NOT_FOUND - # Pre-compute timestamps once per entity (not per feature) - # This reduces O(features * entities) to O(entities) for timestamp conversion - row_timestamps = [] + row_ts_protos = [] for row_ts, _ in read_rows: - ts_proto = Timestamp() + ts = Timestamp() if row_ts is not None: - ts_proto.FromDatetime(row_ts) - row_timestamps.append(ts_proto) - - requested_features_vectors = [] - for feature_name in requested_features: - ts_vector = list(row_timestamps) # Shallow copy of pre-computed timestamps - status_vector = [null_status] * n_rows - value_vector = [null_value] * n_rows - for idx, (_, feature_data) in enumerate(read_rows): - if (feature_data is not None) and (feature_name in feature_data): - status_vector[idx] = present_status - value_vector[idx] = feature_data[feature_name] - requested_features_vectors.append((ts_vector, status_vector, value_vector)) - return requested_features_vectors + ts.FromDatetime(row_ts) + row_ts_protos.append(ts) + + ts_template = [null_ts] * output_len + indexes_tuple = tuple(indexes) + for row_idx, destinations in enumerate(indexes_tuple): + ts = row_ts_protos[row_idx] + for out_idx in destinations: + ts_template[out_idx] = ts + + feat_values = [[null_value] * output_len for _ in range(n_features)] + feat_statuses = [[NOT_FOUND] * output_len for _ in range(n_features)] + + feat_idx_map = {name: i for i, name in enumerate(requested_features)} + for row_idx, destinations in enumerate(indexes_tuple): + _, feature_data = read_rows[row_idx] + if feature_data is None: + continue + for feat_name, feat_val in feature_data.items(): + f_idx = feat_idx_map.get(feat_name) + if f_idx is not None: + for out_idx in destinations: + feat_values[f_idx][out_idx] = feat_val + feat_statuses[f_idx][out_idx] = PRESENT + + for f_idx in range(n_features): + online_features_response.results.append( + GetOnlineFeaturesResponse.FeatureVector( + values=feat_values[f_idx], + statuses=feat_statuses[f_idx], + event_timestamps=list(ts_template), + ) + ) def has_all_tags( diff --git a/sdk/python/tests/integration/registration/test_versioning.py b/sdk/python/tests/integration/registration/test_versioning.py index 32143f9dccc..72289e72a4f 100644 --- a/sdk/python/tests/integration/registration/test_versioning.py +++ b/sdk/python/tests/integration/registration/test_versioning.py @@ -603,11 +603,11 @@ def test_version_metadata_disabled_by_default(self, registry, make_fv): # Mock response generation without version metadata response = GetOnlineFeaturesResponse() _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["trips_today"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=True, - requested_features=["trips_today"], table=active_fv, output_len=0, include_feature_view_version_metadata=False, # Default behavior @@ -651,11 +651,11 @@ def test_version_metadata_included_when_requested(self, registry, make_fv, entit # Mock response generation with version metadata response = GetOnlineFeaturesResponse() _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["trips_today", "total_earnings"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=False, # Test without prefixes - requested_features=["trips_today", "total_earnings"], table=active_fv, output_len=0, include_feature_view_version_metadata=True, # Enable metadata @@ -687,11 +687,11 @@ def test_version_metadata_clean_names_with_prefixes(self, registry, make_fv): # Test with full feature names (prefixed) response = GetOnlineFeaturesResponse() _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["trips_today"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=True, # Enable prefixes - requested_features=["trips_today"], table=active_fv, output_len=0, include_feature_view_version_metadata=True, @@ -764,11 +764,11 @@ def test_version_metadata_multiple_feature_views(self, registry, entity): # Process first feature view _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["trips_today"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=False, - requested_features=["trips_today"], table=driver_fv, output_len=0, include_feature_view_version_metadata=True, @@ -776,11 +776,11 @@ def test_version_metadata_multiple_feature_views(self, registry, entity): # Process second feature view _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["total_bookings", "cancellation_rate"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=False, - requested_features=["total_bookings", "cancellation_rate"], table=user_fv, output_len=0, include_feature_view_version_metadata=True, @@ -819,22 +819,22 @@ def test_version_metadata_prevents_duplicates(self, registry, make_fv): # Process same feature view twice (simulating multiple features from same view) _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["trips_today"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=False, - requested_features=["trips_today"], table=active_fv, output_len=0, include_feature_view_version_metadata=True, ) _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["avg_rating"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=False, - requested_features=["avg_rating"], table=active_fv, output_len=0, include_feature_view_version_metadata=True, @@ -864,11 +864,11 @@ def test_version_metadata_backward_compatibility(self, registry, make_fv): # Test calling without the new parameter (should default to False) response = GetOnlineFeaturesResponse() _populate_response_from_feature_data( - feature_data=[], - indexes=[], + requested_features=["trips_today"], + read_rows=[], + indexes=(), online_features_response=response, full_feature_names=True, - requested_features=["trips_today"], table=active_fv, output_len=0, # Note: include_feature_view_version_metadata parameter omitted diff --git a/sdk/python/tests/unit/test_utils.py b/sdk/python/tests/unit/test_utils.py index 4ffec9750b2..7eebf46461a 100644 --- a/sdk/python/tests/unit/test_utils.py +++ b/sdk/python/tests/unit/test_utils.py @@ -1,76 +1,111 @@ """ Tests for feast.utils module. -These unit tests cover the _convert_rows_to_protobuf function which is critical -for online feature retrieval performance. The function converts raw database -rows to protobuf format for the serving response. +These unit tests cover the _populate_response_from_feature_data function +which converts raw online_read rows into protobuf FeatureVectors and +populates the GetOnlineFeaturesResponse. """ from datetime import datetime, timezone +from unittest.mock import MagicMock -from feast.protos.feast.serving.ServingService_pb2 import FieldStatus +from feast.protos.feast.serving.ServingService_pb2 import ( + FieldStatus, + GetOnlineFeaturesResponse, +) from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.utils import _convert_rows_to_protobuf +from feast.utils import _populate_response_from_feature_data -class TestConvertRowsToProtobuf: - """Tests for _convert_rows_to_protobuf function.""" +def _make_table(name="test_fv"): + """Create a minimal mock FeatureView for testing.""" + table = MagicMock() + table.projection.name_to_use.return_value = name + table.projection.name_alias = None + table.projection.name = name + return table - def test_basic_conversion(self): - """Test basic conversion with single feature and entity.""" + +class TestPopulateResponseFromFeatureData: + """Tests for _populate_response_from_feature_data function.""" + + def test_basic_single_feature(self): + """Test basic conversion with single feature and single entity.""" timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) value = ValueProto(float_val=1.5) read_rows = [(timestamp, {"feature_1": value})] - requested_features = ["feature_1"] - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - assert len(result) == 1 - ts_vector, status_vector, value_vector = result[0] - assert len(ts_vector) == 1 - assert ts_vector[0].seconds == int(timestamp.timestamp()) - assert value_vector[0] == value - - def test_multiple_features_same_timestamp(self): - """Test that multiple features share the same pre-computed timestamp. - - This verifies the optimization: timestamps are computed once per entity, - not once per feature per entity. - """ + indexes = ([0],) + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1"], + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=1, + ) + + assert len(response.results) == 1 + assert response.results[0].values[0] == value + assert response.results[0].statuses[0] == FieldStatus.PRESENT + assert response.results[0].event_timestamps[0].seconds == int( + timestamp.timestamp() + ) + assert list(response.metadata.feature_names.val) == ["feature_1"] + + def test_multiple_features_same_entity(self): + """Test multiple features from the same row.""" timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - value1 = ValueProto(float_val=1.0) - value2 = ValueProto(float_val=2.0) - - read_rows = [(timestamp, {"feature_1": value1, "feature_2": value2})] - requested_features = ["feature_1", "feature_2"] - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - assert len(result) == 2 - ts1 = result[0][0][0] - ts2 = result[1][0][0] - assert ts1.seconds == ts2.seconds - assert ts1.seconds == int(timestamp.timestamp()) - - def test_multiple_entities(self): - """Test conversion with multiple entities having different timestamps.""" - ts1 = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - ts2 = datetime(2024, 1, 2, 12, 0, 0, tzinfo=timezone.utc) - - read_rows = [ - (ts1, {"feature_1": ValueProto(float_val=1.0)}), - (ts2, {"feature_1": ValueProto(float_val=2.0)}), - ] - requested_features = ["feature_1"] - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - assert len(result) == 1 - ts_vector, status_vector, value_vector = result[0] - assert len(ts_vector) == 2 - assert ts_vector[0].seconds == int(ts1.timestamp()) - assert ts_vector[1].seconds == int(ts2.timestamp()) + v1 = ValueProto(float_val=1.0) + v2 = ValueProto(float_val=2.0) + + read_rows = [(timestamp, {"feature_1": v1, "feature_2": v2})] + indexes = ([0],) + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1", "feature_2"], + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=1, + ) + + assert len(response.results) == 2 + assert response.results[0].values[0] == v1 + assert response.results[1].values[0] == v2 + ts1 = response.results[0].event_timestamps[0].seconds + ts2 = response.results[1].event_timestamps[0].seconds + assert ts1 == ts2 == int(timestamp.timestamp()) + + def test_multiple_entities_deduplication(self): + """Test that duplicate entity rows are correctly mapped via indexes.""" + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + val = ValueProto(float_val=42.0) + + read_rows = [(ts, {"feature_1": val})] + indexes = ([0, 1, 2],) # One unique row maps to 3 output positions + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1"], + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=3, + ) + + assert len(response.results[0].values) == 3 + for i in range(3): + assert response.results[0].values[i] == val + assert response.results[0].statuses[i] == FieldStatus.PRESENT def test_null_timestamp_handling(self): """Test that null timestamps produce empty Timestamp proto.""" @@ -81,64 +116,120 @@ def test_null_timestamp_handling(self): {"feature_1": ValueProto(float_val=2.0)}, ), ] - requested_features = ["feature_1"] - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - ts_vector = result[0][0] - assert ts_vector[0].seconds == 0 # Null timestamp -> empty proto - assert ts_vector[1].seconds != 0 # Valid timestamp + indexes = ([0],), ([1],) + indexes = ([0], [1]) + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1"], + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=2, + ) + + ts_list = response.results[0].event_timestamps + assert ts_list[0].seconds == 0 # Null timestamp -> empty proto + assert ts_list[1].seconds != 0 # Valid timestamp def test_missing_feature_data(self): """Test handling of missing feature data (None row).""" - timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) read_rows = [ - (timestamp, {"feature_1": ValueProto(float_val=1.0)}), - (timestamp, None), # No feature data for this entity + (ts, {"feature_1": ValueProto(float_val=1.0)}), + (ts, None), ] - requested_features = ["feature_1"] - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - ts_vector, status_vector, value_vector = result[0] - assert len(ts_vector) == 2 - assert status_vector[0] == FieldStatus.PRESENT - assert status_vector[1] == FieldStatus.NOT_FOUND + indexes = ([0], [1]) + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1"], + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=2, + ) + + assert response.results[0].statuses[0] == FieldStatus.PRESENT + assert response.results[0].statuses[1] == FieldStatus.NOT_FOUND def test_feature_not_in_row(self): """Test handling when requested feature is not in the row's data.""" - timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - - read_rows = [ - (timestamp, {"feature_1": ValueProto(float_val=1.0)}), - ] - requested_features = ["feature_1", "feature_2"] # feature_2 not in data - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - assert len(result) == 2 - # feature_1 is present - assert result[0][1][0] == FieldStatus.PRESENT - # feature_2 is not found - assert result[1][1][0] == FieldStatus.NOT_FOUND + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + read_rows = [(ts, {"feature_1": ValueProto(float_val=1.0)})] + indexes = ([0],) + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1", "feature_2"], + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=1, + ) + + assert len(response.results) == 2 + assert response.results[0].statuses[0] == FieldStatus.PRESENT + assert response.results[1].statuses[0] == FieldStatus.NOT_FOUND def test_empty_inputs(self): """Test handling of empty inputs.""" - # Empty rows - result = _convert_rows_to_protobuf(["feature_1"], []) - assert len(result) == 1 - assert len(result[0][0]) == 0 # Empty ts_vector - - # Empty features - timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - result = _convert_rows_to_protobuf([], [(timestamp, {"f": ValueProto()})]) - assert len(result) == 0 + response = GetOnlineFeaturesResponse(results=[]) + _populate_response_from_feature_data( + requested_features=["feature_1"], + read_rows=[], + indexes=(), + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=0, + ) + assert len(response.results) == 1 + assert len(response.results[0].values) == 0 + + response2 = GetOnlineFeaturesResponse(results=[]) + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + _populate_response_from_feature_data( + requested_features=[], + read_rows=[(ts, {"f": ValueProto()})], + indexes=([0],), + online_features_response=response2, + full_feature_names=False, + table=_make_table(), + output_len=1, + ) + assert len(response2.results) == 0 + + def test_full_feature_names(self): + """Test that full_feature_names prefixes feature names with table name.""" + ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + read_rows = [(ts, {"feature_1": ValueProto(float_val=1.0)})] + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=["feature_1"], + read_rows=read_rows, + indexes=([0],), + online_features_response=response, + full_feature_names=True, + table=_make_table("my_fv"), + output_len=1, + ) + + assert list(response.metadata.feature_names.val) == ["my_fv__feature_1"] def test_large_scale_correctness(self): """Test correctness with large number of features and entities. - This test verifies that the optimized implementation produces correct + This test verifies that the fused implementation produces correct results at scale (50 features x 500 entities = 25,000 data points). """ timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) @@ -150,21 +241,28 @@ def test_large_scale_correctness(self): } read_rows = [(timestamp, feature_data.copy()) for _ in range(num_entities)] requested_features = [f"feature_{i}" for i in range(num_features)] - - result = _convert_rows_to_protobuf(requested_features, read_rows) - - # Verify structure - assert len(result) == num_features - for feature_idx, (ts_vector, status_vector, value_vector) in enumerate(result): - assert len(ts_vector) == num_entities - assert len(status_vector) == num_entities - assert len(value_vector) == num_entities - - # Verify all timestamps are the same (pre-computed once) - expected_ts = int(timestamp.timestamp()) - for ts in ts_vector: + indexes = tuple([i] for i in range(num_entities)) + response = GetOnlineFeaturesResponse(results=[]) + + _populate_response_from_feature_data( + requested_features=requested_features, + read_rows=read_rows, + indexes=indexes, + online_features_response=response, + full_feature_names=False, + table=_make_table(), + output_len=num_entities, + ) + + assert len(response.results) == num_features + expected_ts = int(timestamp.timestamp()) + for feature_idx in range(num_features): + fv = response.results[feature_idx] + assert len(fv.values) == num_entities + assert len(fv.statuses) == num_entities + assert len(fv.event_timestamps) == num_entities + + for ts in fv.event_timestamps: assert ts.seconds == expected_ts - - # Verify all statuses are PRESENT - for status in status_vector: + for status in fv.statuses: assert status == FieldStatus.PRESENT