-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: MongoDB offline stores #6138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
caseyclements
wants to merge
31
commits into
feast-dev:master
Choose a base branch
from
caseyclements:FEAST-OfflineStore-INTPYTHON-297
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
ddc5314
feat: Add MongoDB offline store (ibis-based PIT join, v1 alpha)
caseyclements 8b7f710
refactor: improve MongoDB offline store code quality
caseyclements 62695aa
Started work on full Mongo/MQL implementation. Kept MongoDBOfflineSto…
caseyclements 812d03d
refactor: rename alpha to preview, clarify MQL pipeline comments
caseyclements c3401ea
Added unit tests for offline store retrieval, requiring docker and py…
caseyclements ec2e7ba
Added test of multiple feature views and compound join keys
caseyclements a4d2886
Initial implementation of native single-collection offline store
caseyclements e9de6f3
Added DriverInfo to MongoDBClients
caseyclements 81d194c
Optimized MQL. Applied FV-level TTL
caseyclements ad85385
filter TTL by relevant FVs only, cautiously reset df index; add creat…
caseyclements 4d02feb
Updated docstrings
caseyclements 8d86cdd
Lazy index creation via _get_client_and_ensure_indexes
caseyclements a1e3c93
Add performance benchmarks comparing Ibis vs Native MongoDB offline s…
caseyclements b8fcba5
Refactor Native get_historical_features: replace with fetch+pandas join
caseyclements 5d516a8
Refactor get_historical_features with chunked processing for large en…
caseyclements c7281fb
Optimize Native get_historical_features: reuse client, increase batch…
caseyclements 18bb999
Remove duplicate MongoDBOfflineStoreNative from mongodb.py
caseyclements 38d40f5
Consolidate mongodb_source.py into mongodb.py
caseyclements 9bd0c1a
Rename mongodb_offline_store to mongodb, use One/Many naming convention
caseyclements 2c25494
Add README.md documenting MongoDB offline store implementations
caseyclements b50e22f
Rename mongodb/ to mongodb_offline_store/, organize tests
caseyclements bae2648
Update docstring in benchmark.py
caseyclements e4c79bf
Update README to show created_at tie-breaker in Many schema
caseyclements 548698b
Update README index recommendations for Many implementation
caseyclements 1597264
Add auto-create index to MongoDBOfflineStoreMany
caseyclements 39afa9a
Update benchmark.py to use One/Many naming convention
caseyclements 5146c4e
Add comprehensive module docstring to mongodb_many.py
caseyclements 612d05a
Add Feature Freshness and Schema Evolution docs to mongodb_many.py
caseyclements 970ec79
Add MongoDB DataSourceCreators for universal Feast tests
caseyclements 9dc9162
Add .secrets.baseline
caseyclements b8ddd02
Merge branch 'master' into FEAST-OfflineStore-INTPYTHON-297
franciscojavierarceo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| # MongoDB Feast Integration — Session Notes | ||
| _Last updated: 2026-03-16. Resume here after OS upgrade._ | ||
|
|
||
| --- | ||
|
|
||
| ## Status at a Glance | ||
|
|
||
| | Component | Branch | Status | | ||
| |---|---|---| | ||
| | **Online Store** | `INTPYTHON-297-MongoDB-Feast-Integration` | ✅ **Merged to upstream/master** | | ||
| | **Offline Store** | `FEAST-OfflineStore-INTPYTHON-297` | 🔧 In progress — next focus | | ||
|
|
||
| --- | ||
|
|
||
| ## Online Store — COMPLETE ✅ | ||
|
|
||
| ### What was done | ||
| - Implemented `MongoDBOnlineStore` with full sync + async API | ||
| - Refactored write path: extracted `_build_write_ops` static method to eliminate code | ||
| duplication between `online_write_batch` and `online_write_batch_async` | ||
| - Added Feast driver metadata to MongoDB client instantiations | ||
| - Registered MongoDB in the feast-operator (kubebuilder enums, `ValidOnlineStoreDBStorePersistenceTypes`, operator YAMLs) | ||
| - Updated online store status from `alpha` → `preview` in docs | ||
| - All 5 unit tests pass (including Docker-based testcontainers integration test) | ||
|
|
||
| ### Key files | ||
| - `sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py` — main implementation | ||
| - `sdk/python/tests/unit/online_store/test_mongodb_online_retrieval.py` — test suite | ||
| - `sdk/python/tests/universal/feature_repos/universal/online_store/mongodb.py` — universal test repo config | ||
|
|
||
| ### Git history cleanup (this session) | ||
| The PR had two merge commits (`632e103a6`, `26ce79b37`) that blocked squash-and-merge. | ||
| Resolution: | ||
| 1. `git fetch --all` | ||
| 2. Created clean branch `FEAST-OnlineStore-INTPYTHON-297` from `upstream/master` | ||
| 3. Cherry-picked all 47 commits (oldest → newest), skipping the two merge commits | ||
| 4. Resolved conflicts: directory rename (`tests/integration/` → `tests/universal/`), | ||
| `pixi.lock` auto-resolved, `detect-secrets` false positives got `# pragma: allowlist secret` | ||
| 5. Force-pushed to `INTPYTHON-297-MongoDB-Feast-Integration` — maintainer squash-merged ✅ | ||
|
|
||
| ### Versioning | ||
| Version is derived dynamically via `setuptools_scm` from git tags (no hardcoded version). | ||
| Latest tag at time of merge: **`v0.60.0`**. Feature ships in the next release after that. | ||
| Update JIRA with the next release tag once the maintainers cut it. | ||
|
|
||
| --- | ||
|
|
||
| ## Offline Store — IN PROGRESS 🔧 | ||
|
|
||
| ### Branch | ||
| ``` | ||
| FEAST-OfflineStore-INTPYTHON-297 | ||
| ``` | ||
|
|
||
| ### Commits on branch (not yet in upstream/master) | ||
| ``` | ||
| cd3eef677 Started work on full Mongo/MQL implementation. Kept MongoDBOfflineStoreIbis and MongoDBOfflineStoreNative | ||
| 71469f69a feat: restore test-python-universal-mongodb-online Makefile target | ||
| 904505244 fix: pass onerror to pkgutil.walk_packages | ||
| 946d84e4c fix: broaden import exception handling in doctest runner | ||
| 55de0e9b5 fix: catch FeastExtrasDependencyImportError in doctest runner | ||
| 157a71d77 refactor: improve MongoDB offline store code quality | ||
| 67632af2f feat: Add MongoDB offline store (ibis-based PIT join, v1 alpha) | ||
| ``` | ||
|
|
||
| ### Key files | ||
| - `sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb.py` | ||
| - Contains **two prototype implementations**: | ||
| - `MongoDBOfflineStoreIbis` — uses Ibis for point-in-time joins (delegates to `get_historical_features_ibis`) | ||
| - `MongoDBOfflineStoreNative` — native MQL implementation (started in `cd3eef677`, in progress) | ||
| - `sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb_source.py` — `MongoDBSource` data source | ||
|
|
||
| ### Architecture: Ibis vs Native | ||
| - **Ibis approach**: delegates PIT join to `feast.infra.offline_stores.ibis` helpers. | ||
| Pro: less code, consistency with other ibis-backed stores. | ||
| Con: requires ibis-mongodb connector; PIT correctness depends on ibis translation. | ||
| - **Native approach**: implements PIT join directly in MQL (MongoDB aggregation pipeline). | ||
| Pro: no extra dependency, full control. | ||
| Con: more complex; MQL aggregation pipelines can be verbose. | ||
| - Decision pending benchmarking / correctness validation between the two. | ||
|
|
||
| ### Next steps for offline store | ||
| 1. Finish `MongoDBOfflineStoreNative` MQL implementation (started in latest commit) | ||
| 2. Validate PIT correctness for both implementations against the Feast universal test suite | ||
| 3. Run: `make test-python-universal-mongodb-offline` (target may need creating — see `71469f69a`) | ||
| 4. Choose Ibis vs Native based on results; remove the other | ||
| 5. Add to operator (same pattern as online store: kubebuilder enums, install.yaml) | ||
| 6. Open PR — follow same DCO + linear history discipline as online store | ||
|
|
||
| --- | ||
|
|
||
| ## Environment Notes | ||
|
|
||
| - **Python env**: always use `uv run pytest ...` (uses `.venv` in repo root, Python 3.11) | ||
| - **Do NOT use**: system Python (`/Library/Frameworks/Python.framework/...`) or conda envs | ||
| - **Docker**: must be running for the testcontainers integration test | ||
| - **Stale container**: `72d14b345b6a` (mongo:latest, port 57120) — leftover from testing, safe to stop | ||
| - **DCO**: all commits must be signed: `git commit -s` | ||
| - **No push/merge without explicit user approval** | ||
|
|
||
| --- | ||
|
|
||
| ## Git Workflow Reminder | ||
| To keep history clean (lesson from online store PR): | ||
| - Always branch from `upstream/master` (after `git fetch --all`) | ||
| - Never merge upstream into a feature branch — rebase or cherry-pick instead | ||
| - Before opening a PR, verify with: `git log --merges <branch> ^upstream/master --oneline` | ||
| (must return empty) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,239 @@ | ||
| Native MongoDB Offline Store (Hybrid Design) | ||
|
|
||
| Design Document | ||
|
|
||
| Overview | ||
|
|
||
| This document describes the design of the Native MongoDB Offline Store for Feast using a hybrid execution model. The system combines MongoDB’s strengths in indexed data retrieval with Python’s strengths in relational and temporal joins. | ||
|
|
||
| The implementation uses a single-collection schema in MongoDB to store feature data across all FeatureViews and performs point-in-time (PIT) joins using a “fetch + pandas join” strategy. This replaces an earlier fully in-database $lookup approach that proved unscalable for large workloads. | ||
|
|
||
| The result is a design that is performant, scalable, and aligned with Feast’s semantics. | ||
|
|
||
| ⸻ | ||
|
|
||
| Data Model | ||
|
|
||
| All FeatureViews share a single MongoDB collection (feature_history). Each document represents an observation of a FeatureView for a given entity at a specific timestamp. | ||
|
|
||
| Each document contains: | ||
| • A serialized entity identifier (entity_id) | ||
| • A FeatureView identifier (feature_view) | ||
| • A subdocument of feature values (features) | ||
| • An event timestamp (event_timestamp) | ||
| • An ingestion timestamp (created_at) | ||
|
|
||
| This schema supports: | ||
| • Sparse feature storage (not all features present in every document) | ||
| • Flexible schema evolution over time | ||
| • Efficient indexing across FeatureViews | ||
|
|
||
| A compound index is maintained on: | ||
| • (entity_id, feature_view, event_timestamp DESC) | ||
|
|
||
| This index supports efficient filtering by entity, FeatureView, and time range. | ||
|
|
||
| ⸻ | ||
|
|
||
| Execution Model | ||
|
|
||
| High-Level Strategy | ||
|
|
||
| The system implements historical feature retrieval in three stages: | ||
| 1. Preprocessing (Python) | ||
| • Normalize timestamps to UTC | ||
| • Serialize entity keys into entity_id | ||
| • Partition the input entity_df into manageable chunks | ||
| 2. Data Fetching (MongoDB) | ||
| • Query MongoDB using $in on entity IDs | ||
| • Filter by FeatureView and time bounds | ||
| • Retrieve matching feature documents in batches | ||
| 3. Point-in-Time Join (Python) | ||
| • Convert MongoDB results into pandas DataFrames | ||
| • Perform per-FeatureView joins using merge_asof | ||
| • Apply TTL constraints and feature selection | ||
|
|
||
| This design avoids per-row database joins and instead performs a small number of efficient indexed scans. | ||
|
|
||
| ⸻ | ||
|
|
||
| Chunking and Batching | ||
|
|
||
| To ensure scalability, the system separates concerns between: | ||
| • Chunk size (entity_df) | ||
| Controls memory usage in Python | ||
| Default: ~5,000 rows | ||
| • Batch size (MongoDB queries) | ||
| Controls query size and index efficiency | ||
| Default: ~1,000 entity IDs per query | ||
|
|
||
| Each chunk of entity_df is processed independently: | ||
| • Entity IDs are extracted and deduplicated | ||
| • Feature data is fetched in batches | ||
| • Results are joined and accumulated | ||
|
|
||
| This ensures: | ||
| • Bounded memory usage | ||
| • Predictable query performance | ||
| • Compatibility with large workloads | ||
|
|
||
| ⸻ | ||
|
|
||
| Point-in-Time Join Semantics | ||
|
|
||
| For each FeatureView: | ||
| • Feature data is sorted by (entity_id, event_timestamp) | ||
| • The entity dataframe is similarly sorted | ||
| • A backward merge_asof is performed | ||
|
|
||
| This ensures: | ||
| • Only feature values with timestamps ≤ entity timestamp are used | ||
| • The most recent valid feature value is selected | ||
|
|
||
| TTL constraints are applied after the join: | ||
| • If the matched feature timestamp is older than the allowed TTL window, the value is set to NULL | ||
|
|
||
| ⸻ | ||
|
|
||
| Key Improvements in Current Design | ||
|
|
||
| 1. Projection (Reduced Data Transfer) | ||
|
|
||
| The system now explicitly limits fields retrieved from MongoDB to only those required: | ||
| • entity_id | ||
| • feature_view | ||
| • event_timestamp | ||
| • Requested feature fields within features | ||
|
|
||
| This reduces: | ||
| • Network overhead | ||
| • BSON decoding cost | ||
| • Memory usage in pandas | ||
|
|
||
| This is especially important for wide FeatureViews or large documents. | ||
|
|
||
| ⸻ | ||
|
|
||
| 2. Bounded Time Filtering | ||
|
|
||
| Queries now include both: | ||
| • An upper bound (<= max_ts) | ||
| • A lower bound (>= min_ts) | ||
|
|
||
| This significantly reduces the amount of historical data scanned when: | ||
| • The entity dataframe spans a narrow time window | ||
| • The feature store contains deep history | ||
|
|
||
| This optimization improves: | ||
| • Query latency | ||
| • Index selectivity | ||
| • Memory footprint of retrieved data | ||
|
|
||
| Future enhancements may incorporate TTL-aware lower bounds. | ||
|
|
||
| ⸻ | ||
|
|
||
| 3. Correct Sorting for Temporal Joins | ||
|
|
||
| The system ensures proper sorting before merge_asof: | ||
| • Both dataframes are sorted by (entity_id, timestamp) | ||
|
|
||
| This is critical for correctness when: | ||
| • Multiple entities are processed in a single batch | ||
| • Data is interleaved across entities | ||
|
|
||
| Without this, joins may silently produce incorrect results. | ||
|
|
||
| ⸻ | ||
|
|
||
| Tradeoffs | ||
|
|
||
| Advantages | ||
| • Scalability: Avoids O(n × m) behavior of correlated joins | ||
| • Flexibility: Supports sparse and evolving schemas | ||
| • Performance: Leverages MongoDB indexes efficiently | ||
| • Simplicity: Uses well-understood pandas join semantics | ||
|
|
||
| Limitations | ||
| • Memory-bound joins: Requires chunking for large workloads | ||
| • Multiple passes: Each FeatureView requires a separate join | ||
| • No server-side joins: MongoDB is used only for filtering, not relational logic | ||
|
|
||
| ⸻ | ||
|
|
||
| Comparison to Alternative Designs | ||
|
|
||
| Full MongoDB Join ($lookup) | ||
|
|
||
| Rejected due to: | ||
| • Poor scaling with large entity sets | ||
| • Repeated execution of correlated subqueries | ||
| • High latency (orders of magnitude slower) | ||
|
|
||
| ⸻ | ||
|
|
||
| Ibis-Based Design | ||
| • Uses one collection per FeatureView | ||
| • Loads data into memory and performs joins in Python | ||
|
|
||
| Comparison: | ||
| • Similar performance after hybrid redesign | ||
| • Simpler query model | ||
| • Less flexible schema | ||
|
|
||
| The Native design trades simplicity for: | ||
| • Unified storage | ||
| • Better alignment with document-based ingestion | ||
| • More flexible feature evolution | ||
|
|
||
| ⸻ | ||
|
|
||
| Operational Considerations | ||
|
|
||
| Index Management | ||
|
|
||
| Indexes are created lazily at runtime: | ||
| • Ensures correctness without manual setup | ||
| • Avoids placing responsibility on users | ||
|
|
||
| Future improvements may include: | ||
| • Optional strict index validation | ||
| • Configuration-driven index management | ||
|
|
||
| ⸻ | ||
|
|
||
| MongoDB Client Usage | ||
|
|
||
| Each chunk currently uses a separate MongoDB client instance. | ||
|
|
||
| This is acceptable for moderate workloads but may be optimized in the future by: | ||
| • Reusing a shared client per retrieval job | ||
| • Leveraging connection pooling more explicitly | ||
|
|
||
| ⸻ | ||
|
|
||
| Future Work | ||
|
|
||
| Several enhancements are possible: | ||
| 1. Streaming Joins | ||
| • Avoid materializing all feature data in memory | ||
| • Process data incrementally | ||
| 2. Adaptive Chunking | ||
| • Dynamically adjust chunk size based on memory pressure | ||
| 3. TTL Pushdown | ||
| • Incorporate TTL constraints into MongoDB queries | ||
| 4. Parallel Execution | ||
| • Process chunks concurrently for large workloads | ||
|
|
||
| ⸻ | ||
|
|
||
| Conclusion | ||
|
|
||
| The hybrid MongoDB + pandas design represents a significant improvement over the initial fully in-database approach. It aligns system responsibilities with the strengths of each component: | ||
| • MongoDB handles indexed filtering and retrieval | ||
| • Python handles temporal join logic | ||
|
|
||
| With the addition of projection, bounded time filtering, and correct sorting, the system is now both performant and correct for large-scale historical feature retrieval. | ||
|
|
||
| This design provides a strong foundation for further optimization and production use. | ||
|
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be removed