diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7073a20d1e0..82e30c5df80 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1566,7 +1566,19 @@ def _get_feature_view_and_df_for_online_write( if feature_view.singleton else df.to_dict(orient="list") ) - transformed_data = feature_view.feature_transformation.udf(input_dict) + if feature_view.singleton: + transformed_data = df.apply( + feature_view.feature_transformation.udf, axis=1 + ) + transformed_data = pd.DataFrame( + transformed_data.to_list() + ).applymap( + lambda x: x[0] if isinstance(x, list) and len(x) == 1 else x + ) + else: + transformed_data = feature_view.feature_transformation.udf( + input_dict + ) if feature_view.write_to_online_store: entities = [ self.get_entity(entity) @@ -1574,8 +1586,14 @@ def _get_feature_view_and_df_for_online_write( ] join_keys = [entity.join_key for entity in entities if entity] join_keys = [k for k in join_keys if k in input_dict.keys()] - transformed_df = pd.DataFrame(transformed_data) - input_df = pd.DataFrame(input_dict) + transformed_df = ( + pd.DataFrame(transformed_data) + if not isinstance(transformed_data, pd.DataFrame) + else transformed_data + ) + input_df = pd.DataFrame( + [input_dict] if feature_view.singleton else input_dict + ) if input_df.shape[0] == transformed_df.shape[0]: for k in input_dict: if k not in transformed_data: diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 9c0345db128..99405dc2ca0 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -354,6 +354,10 @@ def online_read( feature_name_feast_primitive_type_map = { f.name: f.dtype for f in table.features } + if getattr(table, "write_to_online_store", False): + feature_name_feast_primitive_type_map.update( + {f.name: f.dtype for f in table.schema} + ) # Build a dictionary mapping composite key -> (res_ts, res) results_dict: Dict[ str, Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]] @@ -394,6 +398,7 @@ def online_read( "int64_val", "float_val", "double_val", + "string_val", ]: setattr( val, @@ -420,7 +425,7 @@ def online_read( setattr(val, proto_attr, field_value) else: raise ValueError( - f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value {field_value}" + f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value type {proto_attr}" ) # res[field] = val key_to_use = field.split(":", 1)[-1] if ":" in field else field diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f4ec0149184..04b03c96823 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -678,6 +678,9 @@ def _construct_random_input( ) -> dict[str, Union[list[Any], Any]]: rand_dict_value: dict[ValueType, Union[list[Any], Any]] = { ValueType.BYTES: [str.encode("hello world")], + ValueType.PDF_BYTES: [ + b"%PDF-1.3\n3 0 obj\n<>\nendobj\n4 0 obj\n<>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<>\nendobj\n5 0 obj\n<>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" + ], ValueType.STRING: ["hello world"], ValueType.INT32: [1], ValueType.INT64: [1], diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 20a9dd9ff6f..3ea62963ae5 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -58,11 +58,19 @@ def infer_features( f"Failed to infer type for feature '{feature_name}' with value " + f"'{feature_value}' since no items were returned by the UDF." ) - inferred_type = type(feature_value[0]) inferred_value = feature_value[0] - if singleton: - inferred_value = feature_value - inferred_type = None # type: ignore + if singleton and isinstance(inferred_value, list): + # If we have a nested list like [[0.5, 0.5, ...]] + if len(inferred_value) > 0: + # Get the actual element type from the inner list + inferred_type = type(inferred_value[0]) + else: + raise TypeError( + f"Failed to infer type for nested feature '{feature_name}' - inner list is empty" + ) + else: + # For non-nested lists or when singleton is False + inferred_type = type(inferred_value) else: inferred_type = type(feature_value) diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 4f13fbf2652..7a31489ac5f 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -23,6 +23,7 @@ PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = { "INVALID": "UNKNOWN", "BYTES": "BYTES", + "PDF_BYTES": "PDF_BYTES", "STRING": "STRING", "INT32": "INT32", "INT64": "INT64", @@ -79,6 +80,7 @@ class PrimitiveFeastType(Enum): FLOAT32 = 6 BOOL = 7 UNIX_TIMESTAMP = 8 + PDF_BYTES = 9 def to_value_type(self) -> ValueType: """ @@ -102,6 +104,7 @@ def __hash__(self): Invalid = PrimitiveFeastType.INVALID Bytes = PrimitiveFeastType.BYTES +PdfBytes = PrimitiveFeastType.PDF_BYTES String = PrimitiveFeastType.STRING Bool = PrimitiveFeastType.BOOL Int32 = PrimitiveFeastType.INT32 @@ -114,6 +117,7 @@ def __hash__(self): Invalid, String, Bytes, + PdfBytes, Bool, Int32, Int64, @@ -126,6 +130,7 @@ def __hash__(self): "INVALID": "Invalid", "STRING": "String", "BYTES": "Bytes", + "PDF_BYTES": "PdfBytes", "BOOL": "Bool", "INT32": "Int32", "INT64": "Int64", @@ -168,6 +173,7 @@ def __str__(self): VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = { ValueType.UNKNOWN: Invalid, ValueType.BYTES: Bytes, + ValueType.PDF_BYTES: PdfBytes, ValueType.STRING: String, ValueType.INT32: Int32, ValueType.INT64: Int64, diff --git a/sdk/python/feast/value_type.py b/sdk/python/feast/value_type.py index 1904baf7bbb..b63a90d1373 100644 --- a/sdk/python/feast/value_type.py +++ b/sdk/python/feast/value_type.py @@ -48,6 +48,7 @@ class ValueType(enum.Enum): BOOL_LIST = 17 UNIX_TIMESTAMP_LIST = 18 NULL = 19 + PDF_BYTES = 20 ListType = Union[ diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index ea76ed6f544..6aa88d3f258 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -4,6 +4,7 @@ import sqlite3 import sys import time +from typing import Any import numpy as np import pandas as pd @@ -1056,7 +1057,7 @@ def test_local_milvus() -> None: client.drop_collection(collection_name=COLLECTION_NAME) -def test_milvus_lite_get_online_documents_v2() -> None: +def test_milvus_lite_retrieve_online_documents_v2() -> None: """ Test retrieving documents from the online store in local mode. """ @@ -1226,6 +1227,199 @@ def test_milvus_lite_get_online_documents_v2() -> None: assert len(result["distance"]) == len(results[0]) +def test_milvus_stored_writes_with_explode() -> None: + """ + Test storing and retrieving exploded document embeddings with Milvus online store. + """ + from feast import ( + Entity, + RequestSource, + ) + from feast.field import Field + from feast.on_demand_feature_view import on_demand_feature_view + from feast.types import ( + Array, + Bytes, + Float32, + String, + ValueType, + ) + + random.seed(42) + vector_length = 10 + runner = CliRunner() + with runner.local_repo( + example_repo_py=get_example_repo("example_rag_feature_repo.py"), + offline_store="file", + online_store="milvus", + apply=False, + teardown=False, + ) as store: + # Define entities and sources + chunk = Entity( + name="chunk", join_keys=["chunk_id"], value_type=ValueType.STRING + ) + document = Entity( + name="document", join_keys=["document_id"], value_type=ValueType.STRING + ) + + input_explode_request_source = RequestSource( + name="document_source", + schema=[ + Field(name="document_id", dtype=String), + Field(name="document_text", dtype=String), + Field(name="document_bytes", dtype=Bytes), + ], + ) + + @on_demand_feature_view( + entities=[chunk, document], + sources=[input_explode_request_source], + schema=[ + Field(name="document_id", dtype=String), + Field(name="chunk_id", dtype=String), + Field(name="chunk_text", dtype=String), + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="COSINE", # Use COSINE like in Milvus test + ), + ], + mode="python", + write_to_online_store=True, + ) + def milvus_explode_feature_view(inputs: dict[str, Any]): + output: dict[str, Any] = { + "document_id": ["doc_1", "doc_1", "doc_2", "doc_2"], + "chunk_id": ["chunk-1", "chunk-2", "chunk-1", "chunk-2"], + "chunk_text": [ + "hello friends", + "how are you?", + "This is a test.", + "Document chunking example.", + ], + "vector": [ + [0.1] * vector_length, + [0.2] * vector_length, + [0.3] * vector_length, + [0.4] * vector_length, + ], + } + return output + + # Apply the feature store configuration + store.apply( + [ + chunk, + document, + input_explode_request_source, + milvus_explode_feature_view, + ] + ) + + # Verify feature view registration + odfv_applied = store.get_on_demand_feature_view("milvus_explode_feature_view") + assert odfv_applied.features[1].vector_index + assert odfv_applied.entities == [chunk.name, document.name] + assert odfv_applied.entity_columns[0].name == document.join_key + assert odfv_applied.entity_columns[1].name == chunk.join_key + + # Write to online store + odfv_entity_rows_to_write = [ + { + "document_id": "document_1", + "document_text": "Hello world. How are you?", + }, + { + "document_id": "document_2", + "document_text": "This is a test. Document chunking example.", + }, + ] + store.write_to_online_store( + feature_view_name="milvus_explode_feature_view", + df=odfv_entity_rows_to_write, + ) + + # Verify feature retrieval + fv_entity_rows_to_read = [ + { + "document_id": "doc_1", + "chunk_id": "chunk-2", + }, + { + "document_id": "doc_2", + "chunk_id": "chunk-1", + }, + ] + + online_response = store.get_online_features( + entity_rows=fv_entity_rows_to_read, + features=[ + "milvus_explode_feature_view:document_id", + "milvus_explode_feature_view:chunk_id", + "milvus_explode_feature_view:chunk_text", + ], + ).to_dict() + + assert sorted(list(online_response.keys())) == sorted( + [ + "chunk_id", + "chunk_text", + "document_id", + ] + ) + + # Test vector search using Milvus + query_embedding = [0.1] * vector_length + + # First get Milvus client and search directly + client = store._provider._online_store.client + collection_name = client.list_collections()[0] + search_params = { + "metric_type": "COSINE", + "params": {"nprobe": 10}, + } + + direct_results = client.search( + collection_name=collection_name, + data=[query_embedding], + anns_field="vector", + search_params=search_params, + limit=2, + output_fields=["document_id", "chunk_id", "chunk_text"], + ) + + # Then use the Feast API + feast_results = store.retrieve_online_documents_v2( + features=[ + "milvus_explode_feature_view:document_id", + "milvus_explode_feature_view:chunk_id", + "milvus_explode_feature_view:chunk_text", + ], + query=query_embedding, + top_k=2, + ).to_dict() + + # Validate vector search results + assert "document_id" in feast_results + assert "chunk_id" in feast_results + assert "chunk_text" in feast_results + assert "distance" in feast_results + assert len(feast_results["distance"]) == 2 + assert len(feast_results["document_id"]) == 2 + assert ( + len(direct_results[0]) == 2 + ) # Verify both approaches return same number of results + del feast_results["distance"] + + assert feast_results == { + "document_id": ["doc_2", "doc_1"], + "chunk_id": ["chunk-1", "chunk-2"], + "chunk_text": ["This is a test.", "how are you?"], + } + + def test_milvus_native_from_feast_data() -> None: import random from datetime import datetime diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 7ae9f1c70e6..279b8530ae2 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1,4 +1,5 @@ import os +import platform import re import sqlite3 import sys @@ -31,6 +32,7 @@ Float32, Float64, Int64, + PdfBytes, String, UnixTimestamp, ValueType, @@ -38,6 +40,8 @@ from_value_type, ) +MAC_VER = platform.mac_ver()[0].split(".")[0] if platform.mac_ver() else "" + class TestOnDemandPythonTransformation(unittest.TestCase): def setUp(self): @@ -802,6 +806,14 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]: store.apply([request_source, python_view]) +@pytest.mark.skipif( + not ( + sys.version_info[0:2] in [(3, 10), (3, 11)] + and platform.system() == "Darwin" + and MAC_VER != "14" + ), + reason="Only works on Python 3.10 and 3.11 on macOS", +) class TestOnDemandTransformationsWithWrites(unittest.TestCase): def test_stored_writes(self): with tempfile.TemporaryDirectory() as data_dir: @@ -1291,3 +1303,172 @@ def python_stored_writes_feature_view_explode_singleton( "chunk_text": ["hello friends", "how are you?"], "distance": [0.11180340498685837, 0.3354102075099945], } + + def test_docling_transform(self): + import io + + from docling.chunking import HybridChunker + from docling.datamodel.base_models import DocumentStream + from docling.document_converter import DocumentConverter + from transformers import AutoTokenizer + + EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2" + VECTOR_LEN = 10 + MAX_TOKENS = 64 # Small token limit for demonstration + tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL_ID) + chunker = HybridChunker( + tokenizer=tokenizer, max_tokens=MAX_TOKENS, merge_peers=True + ) + + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation_explode", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db"), + vector_enabled=True, + vector_len=VECTOR_LEN, + ), + ) + ) + + chunk = Entity( + name="chunk_id", + description="Chunk ID", + value_type=ValueType.STRING, + join_keys=["chunk_id"], + ) + + document = Entity( + name="document_id", + description="Document ID", + value_type=ValueType.STRING, + join_keys=["document_id"], + ) + + def embed_chunk(input_string) -> dict[str, list[float]]: + output = {"query_embedding": [0.5] * VECTOR_LEN} + return output + + input_request_pdf = RequestSource( + name="pdf_request_source", + schema=[ + Field(name="pdf_bytes", dtype=PdfBytes), + Field(name="file_name", dtype=String), + Field(name="document_id", dtype=String), + ], + ) + self.store.apply([chunk, document, input_request_pdf]) + + @on_demand_feature_view( + entities=[chunk, document], + sources=[input_request_pdf], + schema=[ + Field(name="document_id", dtype=String), + Field(name="chunk_id", dtype=String), + Field(name="chunk_text", dtype=String), + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="L2", + ), + ], + mode="python", + write_to_online_store=True, + singleton=True, + ) + def docling_transform_docs(inputs: dict[str, Any]): + document_ids, chunks, embeddings, chunk_ids = [], [], [], [] + buf = io.BytesIO( + inputs["pdf_bytes"], + ) + source = DocumentStream(name=inputs["file_name"], stream=buf) + converter = DocumentConverter() + result = converter.convert(source) + for i, chunk in enumerate(chunker.chunk(dl_doc=result.document)): + raw_chunk = chunker.serialize(chunk=chunk) + embedding = embed_chunk(raw_chunk).get("query_embedding", []) + chunk_id = f"chunk-{i}" + document_ids.append(inputs["document_id"]) + chunks.append(raw_chunk) + chunk_ids.append(chunk_id) + embeddings.append(embedding) + return { + "document_id": document_ids, + "chunk_id": chunk_ids, + "vector": embeddings, + "chunk_text": chunks, + } + + sample_pdf = b"%PDF-1.3\n3 0 obj\n<>\nendobj\n4 0 obj\n<>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<>\nendobj\n5 0 obj\n<>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" + sample_pdf_2 = b"%PDF-1.3\n3 0 obj\n<>\nendobj\n4 0 obj\n<>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<>\nendobj\n5 0 obj\n<>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is another sample title.)\n/Author (Frank John Broceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" + + sample_input = { + "pdf_bytes": sample_pdf, + "file_name": "sample_pdf", + "document_id": "doc_1", + } + sample_input_2 = { + "pdf_bytes": sample_pdf_2, + "file_name": "sample_pdf_2", + "document_id": "doc_2", + } + docling_output = docling_transform_docs.feature_transformation.udf( + sample_input + ) + + self.store.apply([docling_transform_docs]) + + assert docling_output == { + "document_id": ["doc_1"], + "chunk_id": ["chunk-0"], + "vector": [[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5]], + "chunk_text": [ + "Let's have fun with Natural Language Processing on PDFs." + ], + } + + input_df = pd.DataFrame([sample_input]) + self.store.write_to_online_store("docling_transform_docs", input_df) + + conn = self.store._provider._online_store._conn + document_table = self.store._provider._online_store._conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' and name like '%docling%';" + ).fetchall()[0][0] + written_data = pd.read_sql_query(f"select * from {document_table}", conn) + assert ( + written_data[written_data["feature_name"] == "document_id"][ + "vector_value" + ].values[0] + == "doc_1" + ) + assert ( + written_data[written_data["feature_name"] == "chunk_id"][ + "vector_value" + ].values[0] + == "chunk-0" + ) + + online_features = self.store.get_online_features( + features=[ + "docling_transform_docs:document_id", + "docling_transform_docs:chunk_id", + "docling_transform_docs:chunk_text", + ], + entity_rows=[{"document_id": "doc_1", "chunk_id": "chunk-0"}], + ).to_dict() + online_features == { + "document_id": ["doc_1"], + "chunk_id": ["chunk-0"], + "chunk_text": [ + "Let's have fun with Natural Language Processing on PDFs." + ], + } + + multiple_inputs_df = pd.DataFrame([sample_input, sample_input_2]) + # note this test needs to be updated with writing to the online store to verify this behavior works + assert multiple_inputs_df is not None