Skip to content

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Jan 14, 2026

Fix a logical bug in the implementation of the LastN value filter. The original thinking was that a conservative estimate for keeping the last N values was to keep the last n values in each merged batch. This is actually unsound because the last value under the waterline in a given batch may not be present in the trace due to a matching retraction in another batch. In this case we may need to keep additional values in the output of the cursor. The number of additional values is generally unbounded.

A reliable way to establish the bound is to compute the last n values in the complete trace and discard everything below that. This commit implements this behavior. The main complication here is that we need to pass a spine snapshot to the merge cursor, which required extending the merge cursor API.

Add a brief description of the pull request.

Checklist

  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

Add a few sentences describing the incompatible changes if any.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a logical bug in the GC implementation for asof join postfix operations by improving the LastN value filter evaluation. The original implementation incorrectly assumed that keeping the last N values in each merged batch was sufficient, but this failed when values were retracted in other batches. The fix now evaluates the LastN filter against the complete spine trace rather than individual batches.

Changes:

  • Removed the val_factory parameter from GroupFilter::LastN as it's no longer needed
  • Extended the merge cursor API to accept spine snapshots for precise filter evaluation
  • Added a new FilteredMergeCursorWithSnapshot cursor type that uses the complete trace to evaluate LastN filters
  • Added test coverage for the deletion scenario that exposed the original bug

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
crates/dbsp/src/trace/filter.rs Removed val_factory parameter from GroupFilter::LastN and added requires_snapshot() method
crates/dbsp/src/trace/cursor.rs Split filtering logic: simple filters use FilteredMergeCursor, LastN filters use new FilteredMergeCursorWithSnapshot
crates/dbsp/src/trace.rs Added merge_cursor_with_snapshot method to support snapshot-based filtering
crates/dbsp/src/trace/spine_async.rs Updated async merger to pass spine snapshots to merge cursors when needed
crates/dbsp/src/trace/spine_async/list_merger.rs Extended merger to accept and use spine snapshots
crates/dbsp/src/trace/test/test_batch.rs Updated filter function signature to remove val_factory parameter
crates/dbsp/src/operator/trace.rs Removed val_factory argument from API calls
crates/dbsp/src/operator/dynamic/trace.rs Updated to remove val_factory parameter and added deterministic regression test
crates/dbsp/src/operator/accumulate_trace.rs Removed val_factory argument from API calls
crates/dbsp/src/operator/dynamic/accumulate_trace.rs Updated to remove val_factory parameter
crates/dbsp/src/operator/dynamic/asof_join.rs Removed trailing whitespace

Comment on lines +1486 to +1488
/// Deterministic input that inserts and retracts records in order to simulate a situation where
/// records present in a batch are not present in the trace because they were deleted in a subsequent batch.
/// This triggered a bug in the initial implementation of LastN filters.
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'retracts' to 'records'.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think that @blp is far more qualified to review this, so I will only make this one small comment

/// Also assumed that the values are ordered in some way, so that the last N
/// values under the cursor ate the ones that need to be preserved.
///
/// Note that the `LastN` filter can only be evaluated against an individual batch and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can or cannot?

@blp
Copy link
Member

blp commented Jan 15, 2026

I don't know whether it's useful, but I noticed that a GroupFilterCursor always computes the hash of the key that it's on. If there's some code that uses one and needs to check another spine for a matching key, it could save the hash that it computes to save redundant computation.

Comment on lines +1064 to +1069
// Find n'th value below the waterline.
let mut count = 1;
while count < *n && trace_cursor.val_valid() {
trace_cursor.step_val_reverse();
count += 1;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If n is 1, this loop will run zero times. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, this is intentional.

Fix a logical bug in the implementation of the LastN value filter.  The
original thinking was that a conservative estimate for keeping the last N
values was to keep the last n values in each merged batch. This is actually
unsound because the last value under the waterline in a given batch may not be
present in the trace due to a matching retraction in another batch. In this
case we may need to keep additional values in the output of the cursor. The
number of additional values is generally unbounded.

A reliable way to establish the bound is to compute the last n values in the
complete trace and discard everything below that. This commit implements this
behavior. The main complication here is that we need to pass a spine snapshot
to the merge cursor, which required extending the merge cursor API.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk enabled auto-merge January 16, 2026 02:12
@ryzhyk ryzhyk added this pull request to the merge queue Jan 16, 2026
Merged via the queue into main with commit bfd0717 Jan 16, 2026
1 check passed
@ryzhyk ryzhyk deleted the lastn-gc-postfix branch January 16, 2026 04:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants