-
Notifications
You must be signed in to change notification settings - Fork 93
[dbsp] GC for asof join postfix. #5422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a logical bug in the GC implementation for asof join postfix operations by improving the LastN value filter evaluation. The original implementation incorrectly assumed that keeping the last N values in each merged batch was sufficient, but this failed when values were retracted in other batches. The fix now evaluates the LastN filter against the complete spine trace rather than individual batches.
Changes:
- Removed the
val_factoryparameter fromGroupFilter::LastNas it's no longer needed - Extended the merge cursor API to accept spine snapshots for precise filter evaluation
- Added a new
FilteredMergeCursorWithSnapshotcursor type that uses the complete trace to evaluateLastNfilters - Added test coverage for the deletion scenario that exposed the original bug
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| crates/dbsp/src/trace/filter.rs | Removed val_factory parameter from GroupFilter::LastN and added requires_snapshot() method |
| crates/dbsp/src/trace/cursor.rs | Split filtering logic: simple filters use FilteredMergeCursor, LastN filters use new FilteredMergeCursorWithSnapshot |
| crates/dbsp/src/trace.rs | Added merge_cursor_with_snapshot method to support snapshot-based filtering |
| crates/dbsp/src/trace/spine_async.rs | Updated async merger to pass spine snapshots to merge cursors when needed |
| crates/dbsp/src/trace/spine_async/list_merger.rs | Extended merger to accept and use spine snapshots |
| crates/dbsp/src/trace/test/test_batch.rs | Updated filter function signature to remove val_factory parameter |
| crates/dbsp/src/operator/trace.rs | Removed val_factory argument from API calls |
| crates/dbsp/src/operator/dynamic/trace.rs | Updated to remove val_factory parameter and added deterministic regression test |
| crates/dbsp/src/operator/accumulate_trace.rs | Removed val_factory argument from API calls |
| crates/dbsp/src/operator/dynamic/accumulate_trace.rs | Updated to remove val_factory parameter |
| crates/dbsp/src/operator/dynamic/asof_join.rs | Removed trailing whitespace |
| /// Deterministic input that inserts and retracts records in order to simulate a situation where | ||
| /// records present in a batch are not present in the trace because they were deleted in a subsequent batch. | ||
| /// This triggered a bug in the initial implementation of LastN filters. |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'retracts' to 'records'.
mihaibudiu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think that @blp is far more qualified to review this, so I will only make this one small comment
crates/dbsp/src/trace/filter.rs
Outdated
| /// Also assumed that the values are ordered in some way, so that the last N | ||
| /// values under the cursor ate the ones that need to be preserved. | ||
| /// | ||
| /// Note that the `LastN` filter can only be evaluated against an individual batch and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can or cannot?
|
I don't know whether it's useful, but I noticed that a GroupFilterCursor always computes the hash of the key that it's on. If there's some code that uses one and needs to check another spine for a matching key, it could save the hash that it computes to save redundant computation. |
| // Find n'th value below the waterline. | ||
| let mut count = 1; | ||
| while count < *n && trace_cursor.val_valid() { | ||
| trace_cursor.step_val_reverse(); | ||
| count += 1; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If n is 1, this loop will run zero times. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, this is intentional.
Fix a logical bug in the implementation of the LastN value filter. The original thinking was that a conservative estimate for keeping the last N values was to keep the last n values in each merged batch. This is actually unsound because the last value under the waterline in a given batch may not be present in the trace due to a matching retraction in another batch. In this case we may need to keep additional values in the output of the cursor. The number of additional values is generally unbounded. A reliable way to establish the bound is to compute the last n values in the complete trace and discard everything below that. This commit implements this behavior. The main complication here is that we need to pass a spine snapshot to the merge cursor, which required extending the merge cursor API. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
3f34199 to
3c28166
Compare
Fix a logical bug in the implementation of the LastN value filter. The original thinking was that a conservative estimate for keeping the last N values was to keep the last n values in each merged batch. This is actually unsound because the last value under the waterline in a given batch may not be present in the trace due to a matching retraction in another batch. In this case we may need to keep additional values in the output of the cursor. The number of additional values is generally unbounded.
A reliable way to establish the bound is to compute the last n values in the complete trace and discard everything below that. This commit implements this behavior. The main complication here is that we need to pass a spine snapshot to the merge cursor, which required extending the merge cursor API.
Add a brief description of the pull request.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes
Add a few sentences describing the incompatible changes if any.