diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 82e30c5df80..66b8a4f563d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1567,14 +1567,20 @@ def _get_feature_view_and_df_for_online_write( else df.to_dict(orient="list") ) if feature_view.singleton: - transformed_data = df.apply( - feature_view.feature_transformation.udf, axis=1 - ) - transformed_data = pd.DataFrame( - transformed_data.to_list() - ).applymap( - lambda x: x[0] if isinstance(x, list) and len(x) == 1 else x - ) + transformed_rows = [] + + for i, row in df.iterrows(): + output = feature_view.feature_transformation.udf(row.to_dict()) + if i == 0: + transformed_rows = output + else: + for k in output: + if isinstance(output[k], list): + transformed_rows[k].extend(output[k]) + else: + transformed_rows[k].append(output[k]) + + transformed_data = pd.DataFrame(transformed_rows) else: transformed_data = feature_view.feature_transformation.udf( input_dict diff --git a/sdk/python/tests/unit/online_store/test_online_writes.py b/sdk/python/tests/unit/online_store/test_online_writes.py index 803a93b48dd..5d573bf6b92 100644 --- a/sdk/python/tests/unit/online_store/test_online_writes.py +++ b/sdk/python/tests/unit/online_store/test_online_writes.py @@ -18,12 +18,21 @@ from datetime import datetime, timedelta from typing import Any -from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig +import pandas as pd + +from feast import ( + Entity, + FeatureStore, + FeatureView, + FileSource, + RepoConfig, + RequestSource, +) from feast.driver_test_data import create_driver_hourly_stats_df from feast.field import Field from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import Float32, Float64, Int64 +from feast.types import Array, Float32, Float64, Int64, PdfBytes, String, ValueType class TestOnlineWrites(unittest.TestCase): @@ -144,3 +153,84 @@ def test_online_retrieval(self): "conv_rate_plus_acc", ] ) + + +class TestOnlineWritesWithTransform(unittest.TestCase): + def test_transform_on_write_pdf(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_write_to_online_store_with_transform", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + chunk = Entity( + name="chunk_id", + description="Chunk ID", + value_type=ValueType.STRING, + join_keys=["chunk_id"], + ) + + document = Entity( + name="document_id", + description="Document ID", + value_type=ValueType.STRING, + join_keys=["document_id"], + ) + + input_request_pdf = RequestSource( + name="pdf_request_source", + schema=[ + Field(name="document_id", dtype=String), + Field(name="pdf_bytes", dtype=PdfBytes), + Field(name="file_name", dtype=String), + ], + ) + + @on_demand_feature_view( + entities=[chunk, document], + sources=[input_request_pdf], + schema=[ + Field(name="document_id", dtype=String), + Field(name="chunk_id", dtype=String), + Field(name="chunk_text", dtype=String), + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="L2", + ), + ], + mode="python", + write_to_online_store=True, + singleton=True, + ) + def transform_pdf_on_write_view(inputs: dict[str, Any]) -> dict[str, Any]: + k = 10 + return { + "document_id": ["doc_1", "doc_2"], + "chunk_id": ["chunk-1", "chunk-2"], + "vector": [[0.5] * k, [0.4] * k], + "chunk_text": ["chunk text 1", "chunk text 2"], + } + + self.store.apply([chunk, document, transform_pdf_on_write_view]) + + sample_pdf = b"%PDF-1.3\n3 0 obj\n<>\nendobj\n4 0 obj\n<>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<>\nendobj\n5 0 obj\n<>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title. And this is another sentence. Finally, this is the third sentence.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" + sample_input = { + "pdf_bytes": sample_pdf, + "file_name": "sample_pdf", + "document_id": "doc_1", + } + input_df = pd.DataFrame([sample_input]) + + self.store.write_to_online_store( + feature_view_name="transform_pdf_on_write_view", + df=input_df, + ) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 279b8530ae2..69942f8090c 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1314,7 +1314,7 @@ def test_docling_transform(self): EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2" VECTOR_LEN = 10 - MAX_TOKENS = 64 # Small token limit for demonstration + MAX_TOKENS = 5 # Small token limit for demonstration tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL_ID) chunker = HybridChunker( tokenizer=tokenizer, max_tokens=MAX_TOKENS, merge_peers=True @@ -1421,37 +1421,49 @@ def docling_transform_docs(inputs: dict[str, Any]): sample_input ) - self.store.apply([docling_transform_docs]) + sample_inputs = {} + for key in sample_input.keys(): + sample_inputs[key] = [sample_input[key], sample_input_2[key]] assert docling_output == { - "document_id": ["doc_1"], - "chunk_id": ["chunk-0"], - "vector": [[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5]], + "document_id": ["doc_1", "doc_1", "doc_1"], + "chunk_id": ["chunk-0", "chunk-1", "chunk-2"], + "vector": [ + [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5], + [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5], + [0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5], + ], "chunk_text": [ - "Let's have fun with Natural Language Processing on PDFs." + "Let's have fun", + "with Natural Language Processing on", + "PDFs.", ], } - input_df = pd.DataFrame([sample_input]) - self.store.write_to_online_store("docling_transform_docs", input_df) + self.store.apply([docling_transform_docs]) + + multiple_inputs_df = pd.DataFrame(sample_inputs) + self.store.write_to_online_store( + "docling_transform_docs", multiple_inputs_df + ) conn = self.store._provider._online_store._conn document_table = self.store._provider._online_store._conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' and name like '%docling%';" + "SELECT name FROM sqlite_master WHERE type='table' and name like '%docling_transform%';" ).fetchall()[0][0] written_data = pd.read_sql_query(f"select * from {document_table}", conn) - assert ( + assert sorted( written_data[written_data["feature_name"] == "document_id"][ "vector_value" - ].values[0] - == "doc_1" - ) - assert ( - written_data[written_data["feature_name"] == "chunk_id"][ - "vector_value" - ].values[0] - == "chunk-0" - ) + ] + .unique() + .tolist() + ) == ["doc_1", "doc_2"] + assert sorted( + written_data[written_data["feature_name"] == "chunk_id"]["vector_value"] + .unique() + .tolist() + ) == ["chunk-0", "chunk-1", "chunk-2"] online_features = self.store.get_online_features( features=[ @@ -1461,6 +1473,7 @@ def docling_transform_docs(inputs: dict[str, Any]): ], entity_rows=[{"document_id": "doc_1", "chunk_id": "chunk-0"}], ).to_dict() + online_features == { "document_id": ["doc_1"], "chunk_id": ["chunk-0"], @@ -1468,7 +1481,3 @@ def docling_transform_docs(inputs: dict[str, Any]): "Let's have fun with Natural Language Processing on PDFs." ], } - - multiple_inputs_df = pd.DataFrame([sample_input, sample_input_2]) - # note this test needs to be updated with writing to the online store to verify this behavior works - assert multiple_inputs_df is not None