From 79a91f175318d690e69ced247a5ac9cbd7973105 Mon Sep 17 00:00:00 2001 From: Chaitany patel Date: Tue, 31 Mar 2026 15:52:32 +0530 Subject: [PATCH 1/2] Add DocEmbedder docs and examples Signed-off-by: Chaitany patel --- docs/getting-started/genai.md | 48 +++++ docs/tutorials/rag-with-docling.md | 228 +++++++++++++++++++++ examples/rag-retriever/README.md | 53 +++++ sdk/python/feast/__init__.py | 4 +- sdk/python/feast/doc_embedder.py | 52 ++--- sdk/python/tests/unit/test_doc_embedder.py | 30 +-- 6 files changed, 374 insertions(+), 41 deletions(-) diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index b4bdf1d1dc8..7b682108627 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -56,6 +56,53 @@ The transformation workflow typically involves: 3. **Chunking**: Split documents into smaller, semantically meaningful chunks 4. **Embedding Generation**: Convert text chunks into vector embeddings 5. **Storage**: Store embeddings and metadata in Feast's feature store + +### DocEmbedder: End-to-End Document Ingestion Pipeline + +The `DocEmbedder` class provides an end-to-end pipeline for ingesting documents into Feast's online vector store. It handles chunking, embedding generation, and writing results -- all in a single step. + +#### Key Components + +* **`DocEmbedder`**: High-level orchestrator that runs the full pipeline: chunk → embed → schema transform → write to online store +* **`BaseChunker` / `TextChunker`**: Pluggable chunking layer. `TextChunker` splits text by word count with configurable `chunk_size`, `chunk_overlap`, `min_chunk_size`, and `max_chunk_chars` +* **`BaseEmbedder` / `MultiModalEmbedder`**: Pluggable embedding layer with modality routing. `MultiModalEmbedder` supports text (via sentence-transformers) and image (via CLIP) with lazy model loading +* **`SchemaTransformFn`**: A user-defined function that transforms the chunked + embedded DataFrame into the format expected by the FeatureView schema + +#### Quick Example + +```python +from feast import DocEmbedder +import pandas as pd + +# Prepare your documents +df = pd.DataFrame({ + "id": ["doc1", "doc2"], + "text": ["First document content...", "Second document content..."], +}) + +# Create DocEmbedder -- automatically generates a FeatureView and applies the repo +embedder = DocEmbedder( + repo_path="feature_repo/", + feature_view_name="text_feature_view", +) + +# Embed and ingest documents in one step +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +#### Features + +* **Auto-generates FeatureView**: Creates a Python file with Entity and FeatureView definitions compatible with `feast apply` +* **Auto-applies repo**: Registers the generated FeatureView in the registry automatically +* **Custom schema transform**: Provide your own `SchemaTransformFn` to control how chunked + embedded data maps to your FeatureView schema +* **Extensible**: Subclass `BaseChunker` or `BaseEmbedder` to plug in your own chunking or embedding strategies + +For a complete walkthrough, see the [DocEmbedder tutorial notebook](../../examples/rag-retriever/rag_feast_docembedder.ipynb). ### Feature Transformation for LLMs Feast supports transformations that can be used to: @@ -174,6 +221,7 @@ For more detailed information and examples: * [Vector Database Reference](../reference/alpha-vector-database.md) * [RAG Tutorial with Docling](../tutorials/rag-with-docling.md) +* [DocEmbedder Tutorial Notebook](../../examples/rag-retriever/rag_feast_docembedder.ipynb) * [RAG Fine Tuning with Feast and Milvus](../../examples/rag-retriever/README.md) * [Milvus Quickstart Example](https://github.com/feast-dev/feast/tree/master/examples/rag/milvus-quickstart.ipynb) * [Feast + Ray: Distributed Processing for RAG Applications](https://feast.dev/blog/feast-ray-distributed-processing/) diff --git a/docs/tutorials/rag-with-docling.md b/docs/tutorials/rag-with-docling.md index 88f2bd2aad7..6b85db4177c 100644 --- a/docs/tutorials/rag-with-docling.md +++ b/docs/tutorials/rag-with-docling.md @@ -409,6 +409,234 @@ response = client.chat.completions.create( print('\n'.join([c.message.content for c in response.choices])) ``` +## Alternative: Using DocEmbedder for Simplified Ingestion + +Instead of manually chunking, embedding, and writing documents as shown above, you can use Feast's `DocEmbedder` class to handle the entire pipeline in a single step. `DocEmbedder` automates chunking, embedding generation, FeatureView creation, and writing to the online store. + +### Install Dependencies + +```bash +pip install feast[milvus,rag] +``` + +### Set Up and Ingest with DocEmbedder + +```python +from feast import DocEmbedder +import pandas as pd + +# Prepare your documents as a DataFrame +df = pd.DataFrame({ + "id": ["doc1", "doc2", "doc3"], + "text": [ + "Aaron is a prophet, high priest, and the brother of Moses...", + "God at Sinai granted Aaron the priesthood for himself...", + "His rod turned into a snake. Then he stretched out...", + ], +}) + +# DocEmbedder handles everything: generates FeatureView, applies repo, +# chunks text, generates embeddings, and writes to the online store +embedder = DocEmbedder( + repo_path="feature_repo/", + feature_view_name="text_feature_view", +) + +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +### Retrieve and Query + +Once documents are ingested, you can retrieve them the same way as shown in Step 5 above: + +```python +from feast import FeatureStore + +store = FeatureStore("feature_repo/") + +query_embedding = embed_text("Who are the authors of the paper?") +context_data = store.retrieve_online_documents_v2( + features=[ + "text_feature_view:embedding", + "text_feature_view:text", + "text_feature_view:source_id", + ], + query=query_embedding, + top_k=3, + distance_metric="COSINE", +).to_df() +``` + +### Customizing the Pipeline + +`DocEmbedder` is extensible at every stage. Below are examples of how to create custom components and wire them together. + +#### Custom Chunker + +Subclass `BaseChunker` to implement your own chunking strategy. The `load_parse_and_chunk` method receives each document and must return a list of chunk dictionaries. + +```python +from feast.chunker import BaseChunker, ChunkingConfig +from typing import Any, Optional + +class SentenceChunker(BaseChunker): + """Chunks text by sentences instead of word count.""" + + def load_parse_and_chunk( + self, + source: Any, + source_id: str, + source_column: str, + source_type: Optional[str] = None, + ) -> list[dict]: + import re + + text = str(source) + # Split on sentence boundaries + sentences = re.split(r'(?<=[.!?])\s+', text) + + chunks = [] + current_chunk = [] + chunk_index = 0 + + for sentence in sentences: + current_chunk.append(sentence) + combined = " ".join(current_chunk) + + if len(combined.split()) >= self.config.chunk_size: + chunks.append({ + "chunk_id": f"{source_id}_{chunk_index}", + "original_id": source_id, + source_column: combined, + "chunk_index": chunk_index, + }) + # Keep overlap by retaining the last sentence + current_chunk = [sentence] + chunk_index += 1 + + # Don't forget the last chunk + if current_chunk and len(" ".join(current_chunk).split()) >= self.config.min_chunk_size: + chunks.append({ + "chunk_id": f"{source_id}_{chunk_index}", + "original_id": source_id, + source_column: " ".join(current_chunk), + "chunk_index": chunk_index, + }) + + return chunks +``` + +Or simply configure the built-in `TextChunker`: + +```python +from feast import TextChunker, ChunkingConfig + +chunker = TextChunker(config=ChunkingConfig( + chunk_size=200, + chunk_overlap=50, + min_chunk_size=30, + max_chunk_chars=1000, +)) +``` + +#### Custom Embedder + +Subclass `BaseEmbedder` to use a different embedding model. Register modality handlers in `_register_default_modalities` and implement the `embed` method. + +```python +from feast.embedder import BaseEmbedder, EmbeddingConfig +from typing import Any, List, Optional +import numpy as np + +class OpenAIEmbedder(BaseEmbedder): + """Embedder that uses the OpenAI API for text embeddings.""" + + def __init__(self, model: str = "text-embedding-3-small", config: Optional[EmbeddingConfig] = None): + self.model = model + self._client = None + super().__init__(config) + + def _register_default_modalities(self) -> None: + self.register_modality("text", self._embed_text) + + @property + def client(self): + if self._client is None: + from openai import OpenAI + self._client = OpenAI() + return self._client + + def get_embedding_dim(self, modality: str) -> Optional[int]: + # text-embedding-3-small produces 1536-dim vectors + if modality == "text": + return 1536 + return None + + def embed(self, inputs: List[Any], modality: str) -> np.ndarray: + if modality not in self._modality_handlers: + raise ValueError(f"Unsupported modality: '{modality}'") + return self._modality_handlers[modality](inputs) + + def _embed_text(self, inputs: List[str]) -> np.ndarray: + response = self.client.embeddings.create(input=inputs, model=self.model) + return np.array([item.embedding for item in response.data]) +``` + +#### Custom Logical Layer Function + +The schema transform function transforms the chunked + embedded DataFrame into the exact schema your FeatureView expects. It must accept a `pd.DataFrame` and return a `pd.DataFrame`. + +```python +import pandas as pd +from datetime import datetime, timezone + +def my_schema_transform_fn(df: pd.DataFrame) -> pd.DataFrame: + """Map chunked + embedded columns to the FeatureView schema.""" + return pd.DataFrame({ + "passage_id": df["chunk_id"], + "text": df["text"], + "embedding": df["text_embedding"], + "event_timestamp": [datetime.now(timezone.utc)] * len(df), + "source_id": df["original_id"], + # Add any extra columns your FeatureView expects + "chunk_index": df["chunk_index"], + }) +``` + +#### Putting It All Together + +Pass your custom components to `DocEmbedder`: + +```python +from feast import DocEmbedder + +embedder = DocEmbedder( + repo_path="feature_repo/", + feature_view_name="text_feature_view", + chunker=SentenceChunker(config=ChunkingConfig(chunk_size=150, min_chunk_size=20)), + embedder=OpenAIEmbedder(model="text-embedding-3-small"), + schema_transform_fn=my_schema_transform_fn, + vector_length=1536, # Match the OpenAI embedding dimension +) + +# Embed and ingest +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +> **Note:** When using a custom `schema_transform_fn`, ensure the returned DataFrame columns match your FeatureView schema. When using a custom embedder with a different output dimension, set `vector_length` accordingly (or let it auto-detect via `get_embedding_dim`). + +For a complete end-to-end example, see the [DocEmbedder notebook](https://github.com/feast-dev/feast/tree/master/examples/rag-retriever/rag_feast_docembedder.ipynb). + ## Why Feast for RAG? Feast makes it remarkably easy to set up and manage a RAG system by: diff --git a/examples/rag-retriever/README.md b/examples/rag-retriever/README.md index 4c9eb9bf8c2..7df89957cfa 100644 --- a/examples/rag-retriever/README.md +++ b/examples/rag-retriever/README.md @@ -62,6 +62,59 @@ Navigate to the examples/rag-retriever directory. Here you will find the followi Open `rag_feast.ipynb` and follow the steps in the notebook to run the example. +## Using DocEmbedder for Simplified Ingestion + +As an alternative to the manual data preparation steps in the notebook above, Feast provides the `DocEmbedder` class that automates the entire document-to-embeddings pipeline: chunking, embedding generation, FeatureView creation, and writing to the online store. + +### Install Dependencies + +```bash +pip install feast[milvus,rag] +``` + +### Quick Start + +```python +from feast import DocEmbedder +from datasets import load_dataset + +# Load your dataset +dataset = load_dataset("facebook/wiki_dpr", "psgs_w100.nq.exact", split="train[:1%]", + with_index=False, trust_remote_code=True) +df = dataset.select(range(100)).to_pandas() + +# DocEmbedder handles everything in one step +embedder = DocEmbedder( + repo_path="feature_repo_docembedder/", + feature_view_name="text_feature_view", +) + +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +### What DocEmbedder Does + +1. **Generates a FeatureView**: Automatically creates a Python file with Entity and FeatureView definitions compatible with `feast apply` +2. **Applies the repo**: Registers the FeatureView in the Feast registry and deploys infrastructure (e.g., Milvus collection) +3. **Chunks documents**: Splits text into smaller passages using `TextChunker` (configurable chunk size, overlap, etc.) +4. **Generates embeddings**: Produces vector embeddings using `MultiModalEmbedder` (defaults to `all-MiniLM-L6-v2`) +5. **Writes to online store**: Stores the processed data in your configured online store (e.g., Milvus) + +### Customization + +* **Custom Chunker**: Subclass `BaseChunker` for your own chunking strategy +* **Custom Embedder**: Subclass `BaseEmbedder` to use a different embedding model +* **Logical Layer Function**: Provide a `SchemaTransformFn` to control how the output maps to your FeatureView schema + +### Example Notebook + +See **`rag_feast_docembedder.ipynb`** for a complete end-to-end example that uses DocEmbedder with the Wiki DPR dataset and then queries the results using `FeastRAGRetriever`. + ## FeastRagRetriver Low Level Design Low level design for feast rag retriever diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index c91e6b4c3ec..b1881c50150 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -17,7 +17,7 @@ from .chunker import BaseChunker, ChunkingConfig, TextChunker from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource from .dataframe import DataFrameEngine, FeastDataFrame -from .doc_embedder import DocEmbedder, LogicalLayerFn +from .doc_embedder import DocEmbedder, SchemaTransformFn from .embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder from .entity import Entity from .feature import Feature @@ -66,7 +66,7 @@ "Project", "FeastVectorStore", "DocEmbedder", - "LogicalLayerFn", + "SchemaTransformFn", "BaseChunker", "TextChunker", "ChunkingConfig", diff --git a/sdk/python/feast/doc_embedder.py b/sdk/python/feast/doc_embedder.py index dca2fdef2d0..9ccd8ec461b 100644 --- a/sdk/python/feast/doc_embedder.py +++ b/sdk/python/feast/doc_embedder.py @@ -14,11 +14,11 @@ @runtime_checkable -class LogicalLayerFn(Protocol): +class SchemaTransformFn(Protocol): """ - Protocol defining the structure for logical layer functions. + Protocol defining the structure for schema transform functions. - The logical layer transforms the output of Chunker + Embedder + The schema transform converts the output of Chunker + Embedder into the format expected by the FeatureView schema. """ @@ -35,9 +35,9 @@ def __call__(self, df: pd.DataFrame) -> pd.DataFrame: ... -def default_logical_layer_fn(df: pd.DataFrame) -> pd.DataFrame: +def default_schema_transform_fn(df: pd.DataFrame) -> pd.DataFrame: """ - Default logical layer function that transforms the output of Chunker + Embedder + Default schema transform function that transforms the output of Chunker + Embedder into the format expected by the FeatureView schema. """ from datetime import datetime, timezone @@ -156,7 +156,7 @@ class DocEmbedder: feature_view_name: Name of the feature view to create. chunker: Chunker to use for chunking the documents. embedder: Embedder to use for embedding the documents. - logical_layer_fn: Logical layer function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. + schema_transform_fn: Schema transform function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. create_feature_view: Whether to create a feature view in the feature repo. By default it will generate a Python file with the FeatureView definition. vector_length: Explicit embedding dimension for the generated FeatureView schema. If None (default), the dimension is auto-detected from the embedder @@ -172,7 +172,7 @@ def __init__( feature_view_name: str = "text_feature_view", chunker: Optional[BaseChunker] = None, embedder: Optional[BaseEmbedder] = None, - logical_layer_fn: LogicalLayerFn = default_logical_layer_fn, + schema_transform_fn: SchemaTransformFn = default_schema_transform_fn, create_feature_view: bool = True, vector_length: Optional[int] = None, auto_apply_repo: bool = True, @@ -184,7 +184,7 @@ def __init__( self.embedder = embedder or MultiModalEmbedder() self.store: Optional[FeatureStore] = None - sig = inspect.signature(logical_layer_fn) + sig = inspect.signature(schema_transform_fn) params = list(sig.parameters.values()) if ( len(params) != 1 @@ -192,9 +192,9 @@ def __init__( or sig.return_annotation != pd.DataFrame ): raise ValueError( - "logical_layer_fn must be a function that takes a DataFrame and returns a DataFrame" + "schema_transform_fn must be a function that takes a DataFrame and returns a DataFrame" ) - self.logical_layer_fn = logical_layer_fn + self.schema_transform_fn = schema_transform_fn if create_feature_view: resolved_vector_length = self._resolve_vector_length(vector_length, "text") generate_repo_file( @@ -287,7 +287,7 @@ def embed_documents( source_column: str, type_column: Optional[str] = None, column_mapping: Optional[tuple[str, str]] = None, - custom_logical_layer_fn: Optional[ + custom_schema_transform_fn: Optional[ Callable[[pd.DataFrame], pd.DataFrame] ] = None, ) -> pd.DataFrame: @@ -300,7 +300,7 @@ def embed_documents( source_column: Column name containing the document sources. type_column: Column name containing the document types. column_mapping: Tuple mapping source columns to (modality, output column). - custom_logical_layer_fn: Custom logical layer function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. + custom_schema_transform_fn: Custom schema transform function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. Returns: DataFrame with the embedded documents. @@ -315,8 +315,8 @@ def embed_documents( df = embed_documents(documents=documents, id_column="id", source_column="source", type_column="type", column_mapping=column_mapping) """ - if custom_logical_layer_fn is not None: - sig = inspect.signature(custom_logical_layer_fn) + if custom_schema_transform_fn is not None: + sig = inspect.signature(custom_schema_transform_fn) params = list(sig.parameters.values()) if ( len(params) != 1 @@ -324,27 +324,27 @@ def embed_documents( or sig.return_annotation != pd.DataFrame ): raise ValueError( - "custom_logical_layer_fn must be a function that takes a DataFrame and returns a DataFrame" + "custom_schema_transform_fn must be a function that takes a DataFrame and returns a DataFrame" ) - current_logical_layer_fn = ( - custom_logical_layer_fn - if custom_logical_layer_fn is not None - else self.logical_layer_fn + current_schema_transform_fn = ( + custom_schema_transform_fn + if custom_schema_transform_fn is not None + else self.schema_transform_fn ) if column_mapping is None: column_mapping = ("text", "text_embedding") if ( - current_logical_layer_fn is default_logical_layer_fn + current_schema_transform_fn is default_schema_transform_fn and column_mapping[0] == "text" and (source_column != "text" or column_mapping[1] != "text_embedding") ): raise ValueError( f"source_column='{source_column}' with output column='{column_mapping[1]}' " - f"is not compatible with default_logical_layer_fn, which expects " + f"is not compatible with default_schema_transform_fn, which expects " f"source_column='text' and column_mapping=('text', 'text_embedding'). " - f"Provide a custom logical_layer_fn." + f"Provide a custom schema_transform_fn." ) if column_mapping[0] == "text": @@ -363,15 +363,15 @@ def embed_documents( if ( column_mapping[0] == "text" - or current_logical_layer_fn is not default_logical_layer_fn + or current_schema_transform_fn is not default_schema_transform_fn ): - df = current_logical_layer_fn(df) + df = current_schema_transform_fn(df) else: warnings.warn( - f"Modality '{column_mapping[0]}' is not supported by the default logical layer function. " + f"Modality '{column_mapping[0]}' is not supported by the default schema transform function. " f"The output DataFrame will be passed directly to the online store. " f"Ensure your FeatureView schema matches the output columns. " - f"You can provide a custom logical layer function to handle this.", + f"You can provide a custom schema transform function to handle this.", UserWarning, stacklevel=2, ) diff --git a/sdk/python/tests/unit/test_doc_embedder.py b/sdk/python/tests/unit/test_doc_embedder.py index 2a9e324e0c4..10f72c82d02 100644 --- a/sdk/python/tests/unit/test_doc_embedder.py +++ b/sdk/python/tests/unit/test_doc_embedder.py @@ -6,7 +6,11 @@ import pytest from feast.chunker import BaseChunker, ChunkingConfig, TextChunker -from feast.doc_embedder import DocEmbedder, LogicalLayerFn, default_logical_layer_fn +from feast.doc_embedder import ( + DocEmbedder, + SchemaTransformFn, + default_schema_transform_fn, +) from feast.embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder @@ -230,17 +234,17 @@ def test_embed_dataframe(self): embedder.embed.assert_called_once_with(["hello", "world"], "text") -def test_logical_layer_fn_protocol_check(): - """A matching function is recognized as LogicalLayerFn.""" +def test_schema_transform_fn_protocol_check(): + """A matching function is recognized as SchemaTransformFn.""" def my_fn(df: pd.DataFrame) -> pd.DataFrame: return df - assert isinstance(my_fn, LogicalLayerFn) + assert isinstance(my_fn, SchemaTransformFn) -def test_default_logical_layer_fn_output(): - """default_logical_layer_fn transforms columns correctly.""" +def test_default_schema_transform_fn_output(): + """default_schema_transform_fn transforms columns correctly.""" input_df = pd.DataFrame( { "chunk_id": ["c1", "c2"], @@ -250,7 +254,7 @@ def test_default_logical_layer_fn_output(): } ) - result = default_logical_layer_fn(input_df) + result = default_schema_transform_fn(input_df) assert list(result.columns) == [ "passage_id", @@ -288,7 +292,7 @@ def test_init_no_feature_view(self, mock_load_config, mock_apply_total, tmp_path assert doc_embedder.feature_view_name == "test_view" assert doc_embedder.chunker is mock_chunker assert doc_embedder.embedder is mock_embedder - assert doc_embedder.logical_layer_fn is default_logical_layer_fn + assert doc_embedder.schema_transform_fn is default_schema_transform_fn @patch("feast.repo_operations.apply_total") @patch("feast.repo_config.load_repo_config") @@ -374,7 +378,7 @@ def test_init_creates_feature_view_auto_detects_vector_length( def test_embed_documents_full_chain( self, mock_fs_cls, mock_load_config, mock_apply_total, tmp_path ): - """embed_documents wires chunk -> embed -> logical_layer -> save correctly.""" + """embed_documents wires chunk -> embed -> schema_transform -> save correctly.""" mock_chunker = MagicMock(spec=BaseChunker) chunked_df = pd.DataFrame( { @@ -409,7 +413,7 @@ def mock_logical_fn(df: pd.DataFrame) -> pd.DataFrame: repo_path=str(tmp_path), chunker=mock_chunker, embedder=mock_embedder, - logical_layer_fn=mock_logical_fn, + schema_transform_fn=mock_logical_fn, create_feature_view=False, ) @@ -528,7 +532,7 @@ def mock_logical_fn(df: pd.DataFrame) -> pd.DataFrame: repo_path=str(tmp_path), chunker=mock_chunker, embedder=mock_embedder, - logical_layer_fn=mock_logical_fn, + schema_transform_fn=mock_logical_fn, create_feature_view=False, ) @@ -599,7 +603,7 @@ def test_save_to_online_store( @patch("feast.repo_config.load_repo_config") @patch("feast.feature_store.FeatureStore") def test_end_to_end_pipeline(mock_fs_cls, mock_load_config, mock_apply_total, tmp_path): - """Full pipeline: real TextChunker + mocked embedder + default logical layer.""" + """Full pipeline: real TextChunker + mocked embedder + default schema transform.""" chunker = TextChunker( config=ChunkingConfig( chunk_size=10, @@ -623,7 +627,7 @@ def fake_embed_dataframe(df, column_mapping): repo_path=str(tmp_path), chunker=chunker, embedder=mock_embedder, - logical_layer_fn=default_logical_layer_fn, + schema_transform_fn=default_schema_transform_fn, create_feature_view=False, ) From 75fe26e7809e1dc9c4749f123eef0bf983e5fc87 Mon Sep 17 00:00:00 2001 From: Chaitany patel Date: Tue, 31 Mar 2026 15:52:32 +0530 Subject: [PATCH 2/2] Add DocEmbedder docs and examples Signed-off-by: Chaitany patel --- docs/getting-started/genai.md | 48 +++++ docs/tutorials/rag-with-docling.md | 228 +++++++++++++++++++++ examples/rag-retriever/README.md | 53 +++++ sdk/python/feast/__init__.py | 4 +- sdk/python/feast/doc_embedder.py | 52 ++--- sdk/python/tests/unit/test_doc_embedder.py | 30 +-- 6 files changed, 374 insertions(+), 41 deletions(-) diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index b4bdf1d1dc8..7b682108627 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -56,6 +56,53 @@ The transformation workflow typically involves: 3. **Chunking**: Split documents into smaller, semantically meaningful chunks 4. **Embedding Generation**: Convert text chunks into vector embeddings 5. **Storage**: Store embeddings and metadata in Feast's feature store + +### DocEmbedder: End-to-End Document Ingestion Pipeline + +The `DocEmbedder` class provides an end-to-end pipeline for ingesting documents into Feast's online vector store. It handles chunking, embedding generation, and writing results -- all in a single step. + +#### Key Components + +* **`DocEmbedder`**: High-level orchestrator that runs the full pipeline: chunk → embed → schema transform → write to online store +* **`BaseChunker` / `TextChunker`**: Pluggable chunking layer. `TextChunker` splits text by word count with configurable `chunk_size`, `chunk_overlap`, `min_chunk_size`, and `max_chunk_chars` +* **`BaseEmbedder` / `MultiModalEmbedder`**: Pluggable embedding layer with modality routing. `MultiModalEmbedder` supports text (via sentence-transformers) and image (via CLIP) with lazy model loading +* **`SchemaTransformFn`**: A user-defined function that transforms the chunked + embedded DataFrame into the format expected by the FeatureView schema + +#### Quick Example + +```python +from feast import DocEmbedder +import pandas as pd + +# Prepare your documents +df = pd.DataFrame({ + "id": ["doc1", "doc2"], + "text": ["First document content...", "Second document content..."], +}) + +# Create DocEmbedder -- automatically generates a FeatureView and applies the repo +embedder = DocEmbedder( + repo_path="feature_repo/", + feature_view_name="text_feature_view", +) + +# Embed and ingest documents in one step +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +#### Features + +* **Auto-generates FeatureView**: Creates a Python file with Entity and FeatureView definitions compatible with `feast apply` +* **Auto-applies repo**: Registers the generated FeatureView in the registry automatically +* **Custom schema transform**: Provide your own `SchemaTransformFn` to control how chunked + embedded data maps to your FeatureView schema +* **Extensible**: Subclass `BaseChunker` or `BaseEmbedder` to plug in your own chunking or embedding strategies + +For a complete walkthrough, see the [DocEmbedder tutorial notebook](../../examples/rag-retriever/rag_feast_docembedder.ipynb). ### Feature Transformation for LLMs Feast supports transformations that can be used to: @@ -174,6 +221,7 @@ For more detailed information and examples: * [Vector Database Reference](../reference/alpha-vector-database.md) * [RAG Tutorial with Docling](../tutorials/rag-with-docling.md) +* [DocEmbedder Tutorial Notebook](../../examples/rag-retriever/rag_feast_docembedder.ipynb) * [RAG Fine Tuning with Feast and Milvus](../../examples/rag-retriever/README.md) * [Milvus Quickstart Example](https://github.com/feast-dev/feast/tree/master/examples/rag/milvus-quickstart.ipynb) * [Feast + Ray: Distributed Processing for RAG Applications](https://feast.dev/blog/feast-ray-distributed-processing/) diff --git a/docs/tutorials/rag-with-docling.md b/docs/tutorials/rag-with-docling.md index 88f2bd2aad7..6b85db4177c 100644 --- a/docs/tutorials/rag-with-docling.md +++ b/docs/tutorials/rag-with-docling.md @@ -409,6 +409,234 @@ response = client.chat.completions.create( print('\n'.join([c.message.content for c in response.choices])) ``` +## Alternative: Using DocEmbedder for Simplified Ingestion + +Instead of manually chunking, embedding, and writing documents as shown above, you can use Feast's `DocEmbedder` class to handle the entire pipeline in a single step. `DocEmbedder` automates chunking, embedding generation, FeatureView creation, and writing to the online store. + +### Install Dependencies + +```bash +pip install feast[milvus,rag] +``` + +### Set Up and Ingest with DocEmbedder + +```python +from feast import DocEmbedder +import pandas as pd + +# Prepare your documents as a DataFrame +df = pd.DataFrame({ + "id": ["doc1", "doc2", "doc3"], + "text": [ + "Aaron is a prophet, high priest, and the brother of Moses...", + "God at Sinai granted Aaron the priesthood for himself...", + "His rod turned into a snake. Then he stretched out...", + ], +}) + +# DocEmbedder handles everything: generates FeatureView, applies repo, +# chunks text, generates embeddings, and writes to the online store +embedder = DocEmbedder( + repo_path="feature_repo/", + feature_view_name="text_feature_view", +) + +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +### Retrieve and Query + +Once documents are ingested, you can retrieve them the same way as shown in Step 5 above: + +```python +from feast import FeatureStore + +store = FeatureStore("feature_repo/") + +query_embedding = embed_text("Who are the authors of the paper?") +context_data = store.retrieve_online_documents_v2( + features=[ + "text_feature_view:embedding", + "text_feature_view:text", + "text_feature_view:source_id", + ], + query=query_embedding, + top_k=3, + distance_metric="COSINE", +).to_df() +``` + +### Customizing the Pipeline + +`DocEmbedder` is extensible at every stage. Below are examples of how to create custom components and wire them together. + +#### Custom Chunker + +Subclass `BaseChunker` to implement your own chunking strategy. The `load_parse_and_chunk` method receives each document and must return a list of chunk dictionaries. + +```python +from feast.chunker import BaseChunker, ChunkingConfig +from typing import Any, Optional + +class SentenceChunker(BaseChunker): + """Chunks text by sentences instead of word count.""" + + def load_parse_and_chunk( + self, + source: Any, + source_id: str, + source_column: str, + source_type: Optional[str] = None, + ) -> list[dict]: + import re + + text = str(source) + # Split on sentence boundaries + sentences = re.split(r'(?<=[.!?])\s+', text) + + chunks = [] + current_chunk = [] + chunk_index = 0 + + for sentence in sentences: + current_chunk.append(sentence) + combined = " ".join(current_chunk) + + if len(combined.split()) >= self.config.chunk_size: + chunks.append({ + "chunk_id": f"{source_id}_{chunk_index}", + "original_id": source_id, + source_column: combined, + "chunk_index": chunk_index, + }) + # Keep overlap by retaining the last sentence + current_chunk = [sentence] + chunk_index += 1 + + # Don't forget the last chunk + if current_chunk and len(" ".join(current_chunk).split()) >= self.config.min_chunk_size: + chunks.append({ + "chunk_id": f"{source_id}_{chunk_index}", + "original_id": source_id, + source_column: " ".join(current_chunk), + "chunk_index": chunk_index, + }) + + return chunks +``` + +Or simply configure the built-in `TextChunker`: + +```python +from feast import TextChunker, ChunkingConfig + +chunker = TextChunker(config=ChunkingConfig( + chunk_size=200, + chunk_overlap=50, + min_chunk_size=30, + max_chunk_chars=1000, +)) +``` + +#### Custom Embedder + +Subclass `BaseEmbedder` to use a different embedding model. Register modality handlers in `_register_default_modalities` and implement the `embed` method. + +```python +from feast.embedder import BaseEmbedder, EmbeddingConfig +from typing import Any, List, Optional +import numpy as np + +class OpenAIEmbedder(BaseEmbedder): + """Embedder that uses the OpenAI API for text embeddings.""" + + def __init__(self, model: str = "text-embedding-3-small", config: Optional[EmbeddingConfig] = None): + self.model = model + self._client = None + super().__init__(config) + + def _register_default_modalities(self) -> None: + self.register_modality("text", self._embed_text) + + @property + def client(self): + if self._client is None: + from openai import OpenAI + self._client = OpenAI() + return self._client + + def get_embedding_dim(self, modality: str) -> Optional[int]: + # text-embedding-3-small produces 1536-dim vectors + if modality == "text": + return 1536 + return None + + def embed(self, inputs: List[Any], modality: str) -> np.ndarray: + if modality not in self._modality_handlers: + raise ValueError(f"Unsupported modality: '{modality}'") + return self._modality_handlers[modality](inputs) + + def _embed_text(self, inputs: List[str]) -> np.ndarray: + response = self.client.embeddings.create(input=inputs, model=self.model) + return np.array([item.embedding for item in response.data]) +``` + +#### Custom Logical Layer Function + +The schema transform function transforms the chunked + embedded DataFrame into the exact schema your FeatureView expects. It must accept a `pd.DataFrame` and return a `pd.DataFrame`. + +```python +import pandas as pd +from datetime import datetime, timezone + +def my_schema_transform_fn(df: pd.DataFrame) -> pd.DataFrame: + """Map chunked + embedded columns to the FeatureView schema.""" + return pd.DataFrame({ + "passage_id": df["chunk_id"], + "text": df["text"], + "embedding": df["text_embedding"], + "event_timestamp": [datetime.now(timezone.utc)] * len(df), + "source_id": df["original_id"], + # Add any extra columns your FeatureView expects + "chunk_index": df["chunk_index"], + }) +``` + +#### Putting It All Together + +Pass your custom components to `DocEmbedder`: + +```python +from feast import DocEmbedder + +embedder = DocEmbedder( + repo_path="feature_repo/", + feature_view_name="text_feature_view", + chunker=SentenceChunker(config=ChunkingConfig(chunk_size=150, min_chunk_size=20)), + embedder=OpenAIEmbedder(model="text-embedding-3-small"), + schema_transform_fn=my_schema_transform_fn, + vector_length=1536, # Match the OpenAI embedding dimension +) + +# Embed and ingest +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +> **Note:** When using a custom `schema_transform_fn`, ensure the returned DataFrame columns match your FeatureView schema. When using a custom embedder with a different output dimension, set `vector_length` accordingly (or let it auto-detect via `get_embedding_dim`). + +For a complete end-to-end example, see the [DocEmbedder notebook](https://github.com/feast-dev/feast/tree/master/examples/rag-retriever/rag_feast_docembedder.ipynb). + ## Why Feast for RAG? Feast makes it remarkably easy to set up and manage a RAG system by: diff --git a/examples/rag-retriever/README.md b/examples/rag-retriever/README.md index 4c9eb9bf8c2..7df89957cfa 100644 --- a/examples/rag-retriever/README.md +++ b/examples/rag-retriever/README.md @@ -62,6 +62,59 @@ Navigate to the examples/rag-retriever directory. Here you will find the followi Open `rag_feast.ipynb` and follow the steps in the notebook to run the example. +## Using DocEmbedder for Simplified Ingestion + +As an alternative to the manual data preparation steps in the notebook above, Feast provides the `DocEmbedder` class that automates the entire document-to-embeddings pipeline: chunking, embedding generation, FeatureView creation, and writing to the online store. + +### Install Dependencies + +```bash +pip install feast[milvus,rag] +``` + +### Quick Start + +```python +from feast import DocEmbedder +from datasets import load_dataset + +# Load your dataset +dataset = load_dataset("facebook/wiki_dpr", "psgs_w100.nq.exact", split="train[:1%]", + with_index=False, trust_remote_code=True) +df = dataset.select(range(100)).to_pandas() + +# DocEmbedder handles everything in one step +embedder = DocEmbedder( + repo_path="feature_repo_docembedder/", + feature_view_name="text_feature_view", +) + +result = embedder.embed_documents( + documents=df, + id_column="id", + source_column="text", + column_mapping=("text", "text_embedding"), +) +``` + +### What DocEmbedder Does + +1. **Generates a FeatureView**: Automatically creates a Python file with Entity and FeatureView definitions compatible with `feast apply` +2. **Applies the repo**: Registers the FeatureView in the Feast registry and deploys infrastructure (e.g., Milvus collection) +3. **Chunks documents**: Splits text into smaller passages using `TextChunker` (configurable chunk size, overlap, etc.) +4. **Generates embeddings**: Produces vector embeddings using `MultiModalEmbedder` (defaults to `all-MiniLM-L6-v2`) +5. **Writes to online store**: Stores the processed data in your configured online store (e.g., Milvus) + +### Customization + +* **Custom Chunker**: Subclass `BaseChunker` for your own chunking strategy +* **Custom Embedder**: Subclass `BaseEmbedder` to use a different embedding model +* **Logical Layer Function**: Provide a `SchemaTransformFn` to control how the output maps to your FeatureView schema + +### Example Notebook + +See **`rag_feast_docembedder.ipynb`** for a complete end-to-end example that uses DocEmbedder with the Wiki DPR dataset and then queries the results using `FeastRAGRetriever`. + ## FeastRagRetriver Low Level Design Low level design for feast rag retriever diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index c91e6b4c3ec..b1881c50150 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -17,7 +17,7 @@ from .chunker import BaseChunker, ChunkingConfig, TextChunker from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource from .dataframe import DataFrameEngine, FeastDataFrame -from .doc_embedder import DocEmbedder, LogicalLayerFn +from .doc_embedder import DocEmbedder, SchemaTransformFn from .embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder from .entity import Entity from .feature import Feature @@ -66,7 +66,7 @@ "Project", "FeastVectorStore", "DocEmbedder", - "LogicalLayerFn", + "SchemaTransformFn", "BaseChunker", "TextChunker", "ChunkingConfig", diff --git a/sdk/python/feast/doc_embedder.py b/sdk/python/feast/doc_embedder.py index dca2fdef2d0..9ccd8ec461b 100644 --- a/sdk/python/feast/doc_embedder.py +++ b/sdk/python/feast/doc_embedder.py @@ -14,11 +14,11 @@ @runtime_checkable -class LogicalLayerFn(Protocol): +class SchemaTransformFn(Protocol): """ - Protocol defining the structure for logical layer functions. + Protocol defining the structure for schema transform functions. - The logical layer transforms the output of Chunker + Embedder + The schema transform converts the output of Chunker + Embedder into the format expected by the FeatureView schema. """ @@ -35,9 +35,9 @@ def __call__(self, df: pd.DataFrame) -> pd.DataFrame: ... -def default_logical_layer_fn(df: pd.DataFrame) -> pd.DataFrame: +def default_schema_transform_fn(df: pd.DataFrame) -> pd.DataFrame: """ - Default logical layer function that transforms the output of Chunker + Embedder + Default schema transform function that transforms the output of Chunker + Embedder into the format expected by the FeatureView schema. """ from datetime import datetime, timezone @@ -156,7 +156,7 @@ class DocEmbedder: feature_view_name: Name of the feature view to create. chunker: Chunker to use for chunking the documents. embedder: Embedder to use for embedding the documents. - logical_layer_fn: Logical layer function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. + schema_transform_fn: Schema transform function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. create_feature_view: Whether to create a feature view in the feature repo. By default it will generate a Python file with the FeatureView definition. vector_length: Explicit embedding dimension for the generated FeatureView schema. If None (default), the dimension is auto-detected from the embedder @@ -172,7 +172,7 @@ def __init__( feature_view_name: str = "text_feature_view", chunker: Optional[BaseChunker] = None, embedder: Optional[BaseEmbedder] = None, - logical_layer_fn: LogicalLayerFn = default_logical_layer_fn, + schema_transform_fn: SchemaTransformFn = default_schema_transform_fn, create_feature_view: bool = True, vector_length: Optional[int] = None, auto_apply_repo: bool = True, @@ -184,7 +184,7 @@ def __init__( self.embedder = embedder or MultiModalEmbedder() self.store: Optional[FeatureStore] = None - sig = inspect.signature(logical_layer_fn) + sig = inspect.signature(schema_transform_fn) params = list(sig.parameters.values()) if ( len(params) != 1 @@ -192,9 +192,9 @@ def __init__( or sig.return_annotation != pd.DataFrame ): raise ValueError( - "logical_layer_fn must be a function that takes a DataFrame and returns a DataFrame" + "schema_transform_fn must be a function that takes a DataFrame and returns a DataFrame" ) - self.logical_layer_fn = logical_layer_fn + self.schema_transform_fn = schema_transform_fn if create_feature_view: resolved_vector_length = self._resolve_vector_length(vector_length, "text") generate_repo_file( @@ -287,7 +287,7 @@ def embed_documents( source_column: str, type_column: Optional[str] = None, column_mapping: Optional[tuple[str, str]] = None, - custom_logical_layer_fn: Optional[ + custom_schema_transform_fn: Optional[ Callable[[pd.DataFrame], pd.DataFrame] ] = None, ) -> pd.DataFrame: @@ -300,7 +300,7 @@ def embed_documents( source_column: Column name containing the document sources. type_column: Column name containing the document types. column_mapping: Tuple mapping source columns to (modality, output column). - custom_logical_layer_fn: Custom logical layer function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. + custom_schema_transform_fn: Custom schema transform function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. Returns: DataFrame with the embedded documents. @@ -315,8 +315,8 @@ def embed_documents( df = embed_documents(documents=documents, id_column="id", source_column="source", type_column="type", column_mapping=column_mapping) """ - if custom_logical_layer_fn is not None: - sig = inspect.signature(custom_logical_layer_fn) + if custom_schema_transform_fn is not None: + sig = inspect.signature(custom_schema_transform_fn) params = list(sig.parameters.values()) if ( len(params) != 1 @@ -324,27 +324,27 @@ def embed_documents( or sig.return_annotation != pd.DataFrame ): raise ValueError( - "custom_logical_layer_fn must be a function that takes a DataFrame and returns a DataFrame" + "custom_schema_transform_fn must be a function that takes a DataFrame and returns a DataFrame" ) - current_logical_layer_fn = ( - custom_logical_layer_fn - if custom_logical_layer_fn is not None - else self.logical_layer_fn + current_schema_transform_fn = ( + custom_schema_transform_fn + if custom_schema_transform_fn is not None + else self.schema_transform_fn ) if column_mapping is None: column_mapping = ("text", "text_embedding") if ( - current_logical_layer_fn is default_logical_layer_fn + current_schema_transform_fn is default_schema_transform_fn and column_mapping[0] == "text" and (source_column != "text" or column_mapping[1] != "text_embedding") ): raise ValueError( f"source_column='{source_column}' with output column='{column_mapping[1]}' " - f"is not compatible with default_logical_layer_fn, which expects " + f"is not compatible with default_schema_transform_fn, which expects " f"source_column='text' and column_mapping=('text', 'text_embedding'). " - f"Provide a custom logical_layer_fn." + f"Provide a custom schema_transform_fn." ) if column_mapping[0] == "text": @@ -363,15 +363,15 @@ def embed_documents( if ( column_mapping[0] == "text" - or current_logical_layer_fn is not default_logical_layer_fn + or current_schema_transform_fn is not default_schema_transform_fn ): - df = current_logical_layer_fn(df) + df = current_schema_transform_fn(df) else: warnings.warn( - f"Modality '{column_mapping[0]}' is not supported by the default logical layer function. " + f"Modality '{column_mapping[0]}' is not supported by the default schema transform function. " f"The output DataFrame will be passed directly to the online store. " f"Ensure your FeatureView schema matches the output columns. " - f"You can provide a custom logical layer function to handle this.", + f"You can provide a custom schema transform function to handle this.", UserWarning, stacklevel=2, ) diff --git a/sdk/python/tests/unit/test_doc_embedder.py b/sdk/python/tests/unit/test_doc_embedder.py index 2a9e324e0c4..10f72c82d02 100644 --- a/sdk/python/tests/unit/test_doc_embedder.py +++ b/sdk/python/tests/unit/test_doc_embedder.py @@ -6,7 +6,11 @@ import pytest from feast.chunker import BaseChunker, ChunkingConfig, TextChunker -from feast.doc_embedder import DocEmbedder, LogicalLayerFn, default_logical_layer_fn +from feast.doc_embedder import ( + DocEmbedder, + SchemaTransformFn, + default_schema_transform_fn, +) from feast.embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder @@ -230,17 +234,17 @@ def test_embed_dataframe(self): embedder.embed.assert_called_once_with(["hello", "world"], "text") -def test_logical_layer_fn_protocol_check(): - """A matching function is recognized as LogicalLayerFn.""" +def test_schema_transform_fn_protocol_check(): + """A matching function is recognized as SchemaTransformFn.""" def my_fn(df: pd.DataFrame) -> pd.DataFrame: return df - assert isinstance(my_fn, LogicalLayerFn) + assert isinstance(my_fn, SchemaTransformFn) -def test_default_logical_layer_fn_output(): - """default_logical_layer_fn transforms columns correctly.""" +def test_default_schema_transform_fn_output(): + """default_schema_transform_fn transforms columns correctly.""" input_df = pd.DataFrame( { "chunk_id": ["c1", "c2"], @@ -250,7 +254,7 @@ def test_default_logical_layer_fn_output(): } ) - result = default_logical_layer_fn(input_df) + result = default_schema_transform_fn(input_df) assert list(result.columns) == [ "passage_id", @@ -288,7 +292,7 @@ def test_init_no_feature_view(self, mock_load_config, mock_apply_total, tmp_path assert doc_embedder.feature_view_name == "test_view" assert doc_embedder.chunker is mock_chunker assert doc_embedder.embedder is mock_embedder - assert doc_embedder.logical_layer_fn is default_logical_layer_fn + assert doc_embedder.schema_transform_fn is default_schema_transform_fn @patch("feast.repo_operations.apply_total") @patch("feast.repo_config.load_repo_config") @@ -374,7 +378,7 @@ def test_init_creates_feature_view_auto_detects_vector_length( def test_embed_documents_full_chain( self, mock_fs_cls, mock_load_config, mock_apply_total, tmp_path ): - """embed_documents wires chunk -> embed -> logical_layer -> save correctly.""" + """embed_documents wires chunk -> embed -> schema_transform -> save correctly.""" mock_chunker = MagicMock(spec=BaseChunker) chunked_df = pd.DataFrame( { @@ -409,7 +413,7 @@ def mock_logical_fn(df: pd.DataFrame) -> pd.DataFrame: repo_path=str(tmp_path), chunker=mock_chunker, embedder=mock_embedder, - logical_layer_fn=mock_logical_fn, + schema_transform_fn=mock_logical_fn, create_feature_view=False, ) @@ -528,7 +532,7 @@ def mock_logical_fn(df: pd.DataFrame) -> pd.DataFrame: repo_path=str(tmp_path), chunker=mock_chunker, embedder=mock_embedder, - logical_layer_fn=mock_logical_fn, + schema_transform_fn=mock_logical_fn, create_feature_view=False, ) @@ -599,7 +603,7 @@ def test_save_to_online_store( @patch("feast.repo_config.load_repo_config") @patch("feast.feature_store.FeatureStore") def test_end_to_end_pipeline(mock_fs_cls, mock_load_config, mock_apply_total, tmp_path): - """Full pipeline: real TextChunker + mocked embedder + default logical layer.""" + """Full pipeline: real TextChunker + mocked embedder + default schema transform.""" chunker = TextChunker( config=ChunkingConfig( chunk_size=10, @@ -623,7 +627,7 @@ def fake_embed_dataframe(df, column_mapping): repo_path=str(tmp_path), chunker=chunker, embedder=mock_embedder, - logical_layer_fn=default_logical_layer_fn, + schema_transform_fn=default_schema_transform_fn, create_feature_view=False, )