Conversation
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
|
Claude finished @alpe's task in 2m 53s —— View job Code Review: feat: Subscribe to forced inclusion ns events
Overall AssessmentThis is a well-architected refactor. Centralizing the follow/catchup lifecycle in Several bugs and a lifecycle leak warrant attention before merging. Bugs1.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds an event-driven DA subscription system: a Subscriber component, backpressure-capable EventSink, timestamp-aware Subscribe signatures, and widespread rewiring of AsyncBlockRetriever, DAFollower, ForcedInclusion retrievers, sequencers and tests to use subscription-driven, context-aware Start lifecycles. Changes
Sequence Diagram(s)sequenceDiagram
participant DAClient as DA Client
participant Subscriber as Subscriber
participant Follow as FollowLoop
participant Catchup as CatchupLoop
participant Handler as SubscriberHandler
DAClient->>Subscriber: NewSubscriber(cfg) / Start(ctx)
Subscriber->>Follow: spawn followLoop
Subscriber->>Catchup: spawn catchupLoop
alt live event
DAClient->>Follow: subscription event (height, blobs, timestamp)
Follow->>Handler: HandleEvent(ctx, event, isInline=true)
Handler->>Handler: Cache/process inline (PipeEvent)
end
alt gap detected
Subscriber->>Catchup: schedule catchup for missingHeight
Catchup->>Handler: HandleCatchup(ctx, missingHeight)
Handler->>DAClient: Retrieve(ctx, missingHeight)
DAClient->>Handler: block blobs
Handler->>Handler: Cache and PipeEvent(ctx, event)
end
Note over Subscriber,Catchup: HighestSeenDAHeight updates & backoff/reconnect on errors
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
* main: fix: persist snapshot once for avoiding badger vlog (#3153) build(deps): Bump docker/setup-buildx-action from 3 to 4 (#3148) build(deps): Bump the all-go group across 5 directories with 8 updates (#3147) build(deps): Bump docker/login-action from 3 to 4 (#3149) build(deps): Bump docker/build-push-action from 6 to 7 (#3151) build(deps): Bump actions/setup-go from 6.2.0 to 6.3.0 (#3150)
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3146 +/- ##
==========================================
+ Coverage 60.20% 60.43% +0.22%
==========================================
Files 115 117 +2
Lines 11902 11968 +66
==========================================
+ Hits 7166 7233 +67
+ Misses 3925 3913 -12
- Partials 811 822 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
* main: chore: fix some function names in comment (#3156)
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
block/internal/syncing/da_follower.go (1)
53-72:⚠️ Potential issue | 🟠 MajorAvoid subscribing to the same namespace twice.
DataNamespaceis documented as "may be nil or equal to Namespace", but this constructor always passes both entries toda.NewSubscriber. When they match, the follower can subscribe to the same namespace twice, which risks duplicate events and duplicate DA work. Build the slice conditionally and drop the duplicate before constructing the subscriber.Suggested fix
dataNs := cfg.DataNamespace - if len(dataNs) == 0 { - dataNs = cfg.Namespace - } + namespaces := [][]byte{cfg.Namespace} + if len(dataNs) != 0 && !slices.Equal(dataNs, cfg.Namespace) { + namespaces = append(namespaces, dataNs) + } f := &daFollower{ retriever: cfg.Retriever, eventSink: cfg.EventSink, @@ f.subscriber = da.NewSubscriber(da.SubscriberConfig{ Client: cfg.Client, Logger: cfg.Logger, - Namespaces: [][]byte{cfg.Namespace, dataNs}, + Namespaces: namespaces, DABlockTime: cfg.DABlockTime, Handler: f, StartHeight: cfg.StartDAHeight, })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 53 - 72, The constructor currently always passes both cfg.Namespace and dataNs (derived from cfg.DataNamespace) to da.NewSubscriber which can duplicate the same namespace; modify the da_follower constructor (around daFollower and f.subscriber creation) to build the Namespaces slice conditionally: include cfg.Namespace, then only append dataNs if it is non-empty and not equal to cfg.Namespace (or dedupe by value), and then pass that deduped [][]byte to da.NewSubscriber via da.SubscriberConfig so the subscriber never gets the same namespace twice.
🧹 Nitpick comments (4)
block/internal/da/tracing_test.go (1)
25-33: Add a realSubscribeforwarding test.This mock signature change keeps the file compiling, but nothing in this suite fails if
WithTracingClient.Subscribedrops or flipsincludeTimestamp. A tiny round-trip assertion here would pin the new behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/tracing_test.go` around lines 25 - 33, Add a unit test that verifies WithTracingClient.Subscribe forwards the includeTimestamp flag correctly by exercising mockFullClient.Subscribe via its subscribeFn: implement a mock subscribeFn that captures the received ts parameter and returns a channel with a dummy datypes.SubscriptionEvent, call WithTracingClient.Subscribe (or the wrapper under test) with ts=true and ts=false, and assert the captured value equals the input for both cases; ensure you use mockFullClient.Subscribe to forward to subscribeFn so the test fails if Subscribe flips or drops includeTimestamp.block/public.go (1)
81-82: Clarify the new retriever lifecycle in the public API.
NewForcedInclusionRetrievernow accepts actx, but the concrete retriever doesn't use that context until a laterStart(ctx)call. Please either document that two-step startup here or drop the constructorctxso callers don't assume construction already wires cancellation.As per coding guidelines, "Document exported types and functions".
Also applies to: 84-95
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/public.go` around lines 81 - 82, The exported constructor NewForcedInclusionRetriever currently takes a ctx but the retriever only honors cancellation when Start(ctx) is called; update the public API so callers aren't misled by construction-time cancellation — either remove the ctx parameter from NewForcedInclusionRetriever and update all call sites to pass ctx into Start(ctx) instead, or add/expand the exported doc comments on NewForcedInclusionRetriever and the relevant exported retriever type to explicitly state this is a two-step lifecycle (construction does not start background work or observe ctx; callers must call Start(ctx) to begin and to enable cancellation). Reference NewForcedInclusionRetriever and Start(ctx) in the doc comment so the intended usage is clear.block/internal/syncing/syncer_forced_inclusion_test.go (1)
77-82: Exercise the started FI retriever path at least once.After the constructor stopped auto-starting the background subscriber, these tests still never call
Start(ctx), so they only cover the synchronous fallback. Please add one case that starts the retriever and drives the subscription path too.Also applies to: 150-154
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/syncer_forced_inclusion_test.go` around lines 77 - 82, The test never exercises the started ForcedInclusion retriever subscription path; update the test to call fiRetriever.Start(t.Context()) (after creating fiRetriever and before t.Cleanup) and drive the subscription by sending a datypes.SubscriptionEvent into subCh to verify the asynchronous handler path runs, keeping the existing t.Cleanup(fiRetriever.Stop). Ensure the mock client expectation for Subscribe (client.On("Subscribe", ...)) remains and that the event you send matches the handler's expected format so the retriever processes the subscription case rather than only the synchronous fallback.block/internal/da/async_block_retriever.go (1)
174-182: Clarify intentional error return in HandleEvent.The
HandleEventmethod returns an error whenisInlineis true, which causes the Subscriber to roll back and delegate processing to the catch-up loop. This is intentional design for the async block retriever use case, but a brief comment explaining this pattern would improve readability.💡 Suggested clarification
// HandleEvent caches blobs from the subscription inline, even empty ones, // to record that the DA height was seen and has 0 blobs. +// Returns an error for inline events to delegate sequential processing to the +// catchup loop, which handles prefetching and ordered retrieval. func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error { f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs) if isInline { + // Intentionally reject inline processing to force catchup loop handling return errors.New("async block retriever relies on catchup state machine") } return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/async_block_retriever.go` around lines 174 - 182, The HandleEvent method on asyncBlockRetriever currently returns an error when isInline is true to force the Subscriber to roll back and let the catch-up loop handle inline events; add a brief clarifying comment above the isInline check (near asyncBlockRetriever.HandleEvent and its call to cacheBlock) explaining that returning an error is intentional to delegate inline event processing to the catch-up state machine (Subscriber/catch-up loop) rather than treating it as a real failure, so future readers understand the control-flow hack.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 128-156: The loop in daFollower.HandleCatchup currently pops
priority hints before fetching, which can permanently drop a hint on transient
fetchAndPipeHeight errors and causes double-fetch when priorityHeight ==
daHeight; change the logic to peek the hint instead of removing it (use the
inverse of popPriorityHeight or re-push it on error), attempt fetchAndPipeHeight
while only removing the hint after a successful fetch, on transient failures
(non-datypes.ErrHeightFromFuture) requeue the same priorityHeight (e.g.
f.pushPriorityHeight(priorityHeight)) and return the error so it will be retried
after backoff, and if priorityHeight == daHeight short-circuit the normal
sequential fetch path so you don't fetch the same height twice. Ensure you still
silently ignore datypes.ErrHeightFromFuture as before.
In `@block/internal/syncing/da_retriever.go`:
- Around line 291-298: The code flips r.strictMode as soon as isValidEnvelope is
true, before running full header checks (ValidateBasic and proposer validation),
so a signed-but-invalid envelope can incorrectly toggle strict mode; change the
logic in the DA retriever so r.strictMode is only set to true after the envelope
passes full header validation (call sites around the current block that
reference isValidEnvelope, header, ValidateBasic, and proposer validation),
mirroring the same guard for the other occurrence (lines ~305-313), and add a
regression test (e.g., "invalid envelope does not toggle strict mode") that
constructs a signed envelope which fails ValidateBasic/proposer checks and
asserts r.strictMode remains false after processing.
In `@block/internal/syncing/syncer_benchmark_test.go`:
- Line 80: The test is comparing the pointer fixt.s.daRetrieverHeight (an
*atomic.Uint64) to a uint64, which compares addresses rather than the stored
value; update the assertion in the test to read the atomic's value via
fixt.s.daRetrieverHeight.Load() and compare that to spec.heights+daHeightOffset
so the actual uint64 contents are asserted (i.e., replace the direct pointer
comparison in the assert.Equal call with a .Load() call).
In `@block/internal/syncing/syncer_test.go`:
- Around line 388-394: The test initializes
syncerInst1.daFollower/syncerInst2.daFollower but then starts and asserts
against local variables follower1/follower2 instead of the syncer's follower;
update the test to assign the created followers back to the syncer instances
(e.g., syncerInst1.daFollower = follower1 and syncerInst2.daFollower =
follower2) and check the Start() errors from both follower1.Start(ctx) and
follower2.Start(ctx) with require.NoError/require.Nil so both starts must
succeed; apply the same fix in the other affected blocks that create follower
instances (the sections around the other noted ranges) so the test uses the
syncer-owned lifecycle throughout.
In `@pkg/sequencers/based/sequencer.go`:
- Line 100: The forced-inclusion retriever is constructed with
block.NewForcedInclusionRetriever but never started; after creating
bs.fiRetriever call its Start(ctx) using a cancellable context (create ctx,
cancel := context.WithCancel(parentCtx) and store or wire cancel for shutdown)
so the retriever's background subscription/prefetch goroutines run; mirror the
pattern used in SingleSequencer by replacing context.Background() with the
cancellable context and invoking bs.fiRetriever.Start(ctx) immediately after
construction.
---
Outside diff comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 53-72: The constructor currently always passes both cfg.Namespace
and dataNs (derived from cfg.DataNamespace) to da.NewSubscriber which can
duplicate the same namespace; modify the da_follower constructor (around
daFollower and f.subscriber creation) to build the Namespaces slice
conditionally: include cfg.Namespace, then only append dataNs if it is non-empty
and not equal to cfg.Namespace (or dedupe by value), and then pass that deduped
[][]byte to da.NewSubscriber via da.SubscriberConfig so the subscriber never
gets the same namespace twice.
---
Nitpick comments:
In `@block/internal/da/async_block_retriever.go`:
- Around line 174-182: The HandleEvent method on asyncBlockRetriever currently
returns an error when isInline is true to force the Subscriber to roll back and
let the catch-up loop handle inline events; add a brief clarifying comment above
the isInline check (near asyncBlockRetriever.HandleEvent and its call to
cacheBlock) explaining that returning an error is intentional to delegate inline
event processing to the catch-up state machine (Subscriber/catch-up loop) rather
than treating it as a real failure, so future readers understand the
control-flow hack.
In `@block/internal/da/tracing_test.go`:
- Around line 25-33: Add a unit test that verifies WithTracingClient.Subscribe
forwards the includeTimestamp flag correctly by exercising
mockFullClient.Subscribe via its subscribeFn: implement a mock subscribeFn that
captures the received ts parameter and returns a channel with a dummy
datypes.SubscriptionEvent, call WithTracingClient.Subscribe (or the wrapper
under test) with ts=true and ts=false, and assert the captured value equals the
input for both cases; ensure you use mockFullClient.Subscribe to forward to
subscribeFn so the test fails if Subscribe flips or drops includeTimestamp.
In `@block/internal/syncing/syncer_forced_inclusion_test.go`:
- Around line 77-82: The test never exercises the started ForcedInclusion
retriever subscription path; update the test to call
fiRetriever.Start(t.Context()) (after creating fiRetriever and before t.Cleanup)
and drive the subscription by sending a datypes.SubscriptionEvent into subCh to
verify the asynchronous handler path runs, keeping the existing
t.Cleanup(fiRetriever.Stop). Ensure the mock client expectation for Subscribe
(client.On("Subscribe", ...)) remains and that the event you send matches the
handler's expected format so the retriever processes the subscription case
rather than only the synchronous fallback.
In `@block/public.go`:
- Around line 81-82: The exported constructor NewForcedInclusionRetriever
currently takes a ctx but the retriever only honors cancellation when Start(ctx)
is called; update the public API so callers aren't misled by construction-time
cancellation — either remove the ctx parameter from NewForcedInclusionRetriever
and update all call sites to pass ctx into Start(ctx) instead, or add/expand the
exported doc comments on NewForcedInclusionRetriever and the relevant exported
retriever type to explicitly state this is a two-step lifecycle (construction
does not start background work or observe ctx; callers must call Start(ctx) to
begin and to enable cancellation). Reference NewForcedInclusionRetriever and
Start(ctx) in the doc comment so the intended usage is clear.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cdba2d5f-0a7b-4edb-a192-a51a19db0f34
📒 Files selected for processing (37)
apps/evm/server/force_inclusion_test.goblock/internal/common/event.goblock/internal/da/async_block_retriever.goblock/internal/da/async_block_retriever_test.goblock/internal/da/client.goblock/internal/da/forced_inclusion_retriever.goblock/internal/da/forced_inclusion_retriever_test.goblock/internal/da/forced_inclusion_tracing.goblock/internal/da/interface.goblock/internal/da/subscriber.goblock/internal/da/subscriber_test.goblock/internal/da/tracing.goblock/internal/da/tracing_test.goblock/internal/syncing/da_follower.goblock/internal/syncing/da_follower_test.goblock/internal/syncing/da_retriever.goblock/internal/syncing/da_retriever_mock.goblock/internal/syncing/da_retriever_strict_test.goblock/internal/syncing/da_retriever_tracing.goblock/internal/syncing/da_retriever_tracing_test.goblock/internal/syncing/raft_retriever.goblock/internal/syncing/syncer.goblock/internal/syncing/syncer_backoff_test.goblock/internal/syncing/syncer_benchmark_test.goblock/internal/syncing/syncer_forced_inclusion_test.goblock/internal/syncing/syncer_test.goblock/public.godocs/guides/da-layers/celestia.mdpkg/da/jsonrpc/types.gopkg/da/types/types.gopkg/sequencers/based/sequencer.gopkg/sequencers/based/sequencer_test.gopkg/sequencers/common/forced_inclusion_retriever_mock.gopkg/sequencers/single/sequencer.gopkg/sequencers/single/sequencer_test.gotest/mocks/da.gotest/testda/dummy.go
💤 Files with no reviewable changes (3)
- block/internal/syncing/syncer_backoff_test.go
- block/internal/syncing/da_retriever_tracing.go
- block/internal/syncing/da_retriever_tracing_test.go
| func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { | ||
| // 1. Drain stale or future priority heights from P2P hints | ||
| for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() { | ||
| if priorityHeight < daHeight { | ||
| continue // skip stale hints without yielding back to the catchup loop | ||
| } | ||
|
|
||
| // Check for priority heights from P2P hints first. | ||
| // We drain stale hints to avoid a tight CPU loop if many are queued. | ||
| priorityHeight := f.retriever.PopPriorityHeight() | ||
| for priorityHeight > 0 && priorityHeight < f.localNextDAHeight.Load() { | ||
| priorityHeight = f.retriever.PopPriorityHeight() | ||
| } | ||
| f.logger.Debug(). | ||
| Uint64("da_height", priorityHeight). | ||
| Msg("fetching priority DA height from P2P hint") | ||
|
|
||
| if priorityHeight > 0 { | ||
| f.logger.Debug(). | ||
| Uint64("da_height", priorityHeight). | ||
| Msg("fetching priority DA height from P2P hint") | ||
| if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { | ||
| if !f.waitOnCatchupError(ctx, err, priorityHeight) { | ||
| return | ||
| } | ||
| if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { | ||
| if errors.Is(err, datypes.ErrHeightFromFuture) { | ||
| // Priority hint points to a future height — silently ignore. | ||
| f.logger.Debug().Uint64("priority_da_height", priorityHeight). | ||
| Msg("priority hint is from future, ignoring") | ||
| continue | ||
| } | ||
| continue | ||
| } | ||
|
|
||
| // Sequential catchup. | ||
| local := f.localNextDAHeight.Load() | ||
| highest := f.highestSeenDAHeight.Load() | ||
|
|
||
| if highest > 0 && local > highest { | ||
| // Caught up. | ||
| f.headReached.Store(true) | ||
| return | ||
| } | ||
|
|
||
| // CAS claims this height prevents followLoop from inline-processing | ||
| if !f.localNextDAHeight.CompareAndSwap(local, local+1) { | ||
| // followLoop already advanced past this height via inline processing. | ||
| continue | ||
| // Roll back so daHeight is attempted again next cycle after backoff. | ||
| return err | ||
| } | ||
| break // continue with daHeight | ||
| } | ||
|
|
||
| if err := f.fetchAndPipeHeight(ctx, local); err != nil { | ||
| // Roll back so we can retry after backoff. | ||
| f.localNextDAHeight.Store(local) | ||
| if !f.waitOnCatchupError(ctx, err, local) { | ||
| return | ||
| } | ||
| continue | ||
| } | ||
| // 2. Normal sequential fetch | ||
| if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil { | ||
| return err | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Don't drop or double-fetch priority heights in HandleCatchup.
This loop removes the priority hint before the DA fetch. A transient fetchAndPipeHeight failure loses the hint permanently, and priorityHeight == daHeight fetches the same height twice in one pass. Requeue retryable failures and short-circuit the normal path when the priority height matches the current sequential height.
Suggested fix
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
// 1. Drain stale or future priority heights from P2P hints
for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
if priorityHeight < daHeight {
continue // skip stale hints without yielding back to the catchup loop
}
+ if priorityHeight == daHeight {
+ return f.fetchAndPipeHeight(ctx, daHeight)
+ }
f.logger.Debug().
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if errors.Is(err, datypes.ErrHeightFromFuture) {
// Priority hint points to a future height — silently ignore.
f.logger.Debug().Uint64("priority_da_height", priorityHeight).
Msg("priority hint is from future, ignoring")
continue
}
+ f.QueuePriorityHeight(priorityHeight)
// Roll back so daHeight is attempted again next cycle after backoff.
return err
}
break // continue with daHeight
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/da_follower.go` around lines 128 - 156, The loop in
daFollower.HandleCatchup currently pops priority hints before fetching, which
can permanently drop a hint on transient fetchAndPipeHeight errors and causes
double-fetch when priorityHeight == daHeight; change the logic to peek the hint
instead of removing it (use the inverse of popPriorityHeight or re-push it on
error), attempt fetchAndPipeHeight while only removing the hint after a
successful fetch, on transient failures (non-datypes.ErrHeightFromFuture)
requeue the same priorityHeight (e.g. f.pushPriorityHeight(priorityHeight)) and
return the error so it will be retried after backoff, and if priorityHeight ==
daHeight short-circuit the normal sequential fetch path so you don't fetch the
same height twice. Ensure you still silently ignore datypes.ErrHeightFromFuture
as before.
There was a problem hiding this comment.
The DA hint is an optimization and comes from an untrusted source. The sequential fetch will cover the hight eventually
There was a problem hiding this comment.
@alpe, understood — that's a clear and reasonable design decision. Since the hints are untrusted and purely opportunistic, dropping them on failure is fine, and any missed heights are guaranteed to be covered by the sequential catchup. I'll withdraw this concern.
✏️ Learnings added
Learnt from: alpe
Repo: evstack/ev-node PR: 3146
File: block/internal/syncing/da_follower.go:128-156
Timestamp: 2026-03-12T14:41:22.380Z
Learning: In evstack/ev-node, DA priority heights (queued via `QueuePriorityHeight` in `block/internal/syncing/da_follower.go`) are untrusted, best-effort optimizations sourced from P2P hints. Dropping a hint on a transient fetch failure is intentional — the sequential catchup loop in `da.Subscriber` will cover every height eventually. Do not flag missing retry/re-queue logic for priority hints as a bug.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: CR
Repo: evstack/ev-node PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-26T20:31:15.823Z
Learning: Applies to **/*.go : Implement the DA interface from pkg/da/types when adding a new DA layer
Learnt from: alpe
Repo: evstack/ev-node PR: 3131
File: block/internal/syncing/syncer_backoff_test.go:241-245
Timestamp: 2026-03-06T09:40:36.029Z
Learning: In evstack/ev-node, the module declares go 1.25.6. Since Go 1.22, loop variables are per-iteration by default, so loop variable capture is not a concern. Do not flag or fix loop variable capture in this codebase for any Go files; this rule applies to all Go files in the repository, not just the specific test file.
| @@ -75,7 +78,7 @@ func BenchmarkSyncerIO(b *testing.B) { | |||
| require.Len(b, fixt.s.heightInCh, 0) | |||
|
|
|||
| assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight) | |||
There was a problem hiding this comment.
Missing .Load() on atomic value comparison.
fixt.s.daRetrieverHeight is an *atomic.Uint64, but it's being compared directly to a uint64 value. This will compare the pointer, not the stored value, causing the assertion to fail.
🐛 Proposed fix
- assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
+ assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight) | |
| assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load()) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer_benchmark_test.go` at line 80, The test is
comparing the pointer fixt.s.daRetrieverHeight (an *atomic.Uint64) to a uint64,
which compares addresses rather than the stored value; update the assertion in
the test to read the atomic's value via fixt.s.daRetrieverHeight.Load() and
compare that to spec.heights+daHeightOffset so the actual uint64 contents are
asserted (i.e., replace the direct pointer comparison in the assert.Equal call
with a .Load() call).
| syncerInst1.daFollower = NewDAFollower(DAFollowerConfig{ | ||
| Retriever: daRtrMock, | ||
| Logger: zerolog.Nop(), | ||
| EventSink: common.EventSinkFunc(syncerInst1.PipeEvent), | ||
| Namespace: []byte("ns"), | ||
| StartDAHeight: syncerInst1.daRetrieverHeight.Load(), | ||
| }) |
There was a problem hiding this comment.
Use the syncer's follower instance in this persistence test.
syncerInst1/2.daFollower is initialized with one instance, but the test starts and asserts against a different local follower. The restart half also drops the error from follower2.Start(ctx). Assign follower1/2 back to syncerInst*.daFollower and require both Start calls to succeed so this test exercises the actual syncer-owned lifecycle.
Also applies to: 437-448, 501-507, 521-532
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer_test.go` around lines 388 - 394, The test
initializes syncerInst1.daFollower/syncerInst2.daFollower but then starts and
asserts against local variables follower1/follower2 instead of the syncer's
follower; update the test to assign the created followers back to the syncer
instances (e.g., syncerInst1.daFollower = follower1 and syncerInst2.daFollower =
follower2) and check the Start() errors from both follower1.Start(ctx) and
follower2.Start(ctx) with require.NoError/require.Nil so both starts must
succeed; apply the same fix in the other affected blocks that create follower
instances (the sections around the other noted ranges) so the test uses the
syncer-owned lifecycle throughout.
julienrbrt
left a comment
There was a problem hiding this comment.
Much better to have the subscriber centralized and re-used. Could you add a changelog?
utACK
| assert.Equal(t, uint64(50), priorityHeight, "valid DA hint should be queued") | ||
| } | ||
|
|
||
| // TestProcessHeightEvent_SkipsDAHintWhenAlreadyDAIncluded verifies that when the |
There was a problem hiding this comment.
This test should be re-added. Some conflicts issue i think
| } | ||
|
|
||
| func TestProcessHeightEvent_SkipsDAHintWhenBelowRetrieverCursor(t *testing.T) { | ||
| func TestProcessHeightEvent_SkipsDAHintWhenAlreadyFetched(t *testing.T) { |
There was a problem hiding this comment.
this test should keep its previous name
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)
125-150:⚠️ Potential issue | 🟠 MajorAvoid double-fetching when the hint matches
daHeight.If
priorityHeight == daHeight, this path fetches the same height once as a priority hint and again in the normal sequential fetch below. That doubles the DA read and can pipe duplicate events downstream.Proposed fix
for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() { if priorityHeight < daHeight { continue // skip stale hints without yielding back to the catchup loop } + if priorityHeight == daHeight { + break + } f.logger.Debug(). Uint64("da_height", priorityHeight). Msg("fetching priority DA height from P2P hint")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 125 - 150, The loop that drains P2P priority hints can fetch the same DA height twice because a successful hint fetch (via fetchAndPipeHeight) is followed by the unconditional normal fetch of daHeight; to fix, track whether the loop successfully handled the current daHeight and skip the subsequent normal fetch when it did. Concretely: in the draining loop around popPriorityHeight and fetchAndPipeHeight set a boolean (e.g., priorityHandled) when fetchAndPipeHeight returns nil for a priorityHeight equal to daHeight (preserving the existing ErrHeightFromFuture handling and continue/return behavior), then after the loop only call fetchAndPipeHeight(ctx, daHeight) if priorityHandled is false; reference the functions/variables popPriorityHeight, fetchAndPipeHeight, daHeight and the ErrHeightFromFuture check when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@block/internal/da/async_block_retriever.go`:
- Around line 224-245: The default branch in the switch inside
async_block_retriever.go's Retrieve (or the enclosing method handling DA fetch
results) currently only logs and returns nil, which lets Subscriber advance
localDAHeight on failures; change the default case to return a non-nil error so
the caller will back off and retry the same height. Specifically, in the switch
over result.Code (cases datypes.StatusHeightFromFuture, StatusNotFound,
StatusSuccess) update the default case to construct and return an error that
includes the height and result.Message (or wrap an existing sentinel like
datypes.ErrFetchFailed) instead of returning nil; leave the existing cacheBlock
calls unchanged for StatusNotFound/StatusSuccess and keep returning
datypes.ErrHeightFromFuture for StatusHeightFromFuture.
In `@block/internal/da/subscriber.go`:
- Around line 195-204: The code currently sets s.headReached true whenever an
inline event is successfully handled, which can be incorrect if a higher DA
height has already been observed; modify the success branch in
subscriber.HandleEvent (the block around updateHighest, localDAHeight, isInline,
err handling) to only set s.headReached.Store(true) when the processed ev.Height
is >= the latest highestSeenDAHeight (read via s.highestSeenDAHeight.Load()),
i.e. after err==nil && isInline do a compare like if ev.Height >=
s.highestSeenDAHeight.Load() { s.headReached.Store(true) } else { leave it false
}, and apply the same conditional change to the analogous success handling in
the other block around lines 281-290 so HasReachedHead() cannot become true
while highestSeenDAHeight is still ahead.
In `@block/internal/syncing/syncer.go`:
- Around line 184-185: Syncer starts the forced-inclusion retriever via
s.fiRetriever.Start(ctx) but never stops it; update Syncer.Stop to call
s.fiRetriever.Stop() (or the retriever's appropriate shutdown method) and ensure
it handles nil checks and any blocking waits/errors (e.g., call
s.fiRetriever.Stop(), then wait for completion if Stop returns a channel/error)
so the retriever started in NewForcedInclusionRetriever / fiRetriever.Start is
symmetrically shut down to avoid goroutine/resource leaks.
In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 79-88: The DA mock returns datypes.StatusNotFound for Retrieve
which lets the real forced-inclusion retriever started by NewBasedSequencer spin
through empty heights; change the default mocked Retrieve return in the test
helper (where mockDAClient.MockClient.On("Subscribe", ...) and .On("Retrieve",
... ) are set) to return datypes.StatusHeightFromFuture instead of
datypes.StatusNotFound so the background retriever does not walk forward
forever—only override to StatusNotFound in tests that explicitly need that
behavior.
---
Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 125-150: The loop that drains P2P priority hints can fetch the
same DA height twice because a successful hint fetch (via fetchAndPipeHeight) is
followed by the unconditional normal fetch of daHeight; to fix, track whether
the loop successfully handled the current daHeight and skip the subsequent
normal fetch when it did. Concretely: in the draining loop around
popPriorityHeight and fetchAndPipeHeight set a boolean (e.g., priorityHandled)
when fetchAndPipeHeight returns nil for a priorityHeight equal to daHeight
(preserving the existing ErrHeightFromFuture handling and continue/return
behavior), then after the loop only call fetchAndPipeHeight(ctx, daHeight) if
priorityHandled is false; reference the functions/variables popPriorityHeight,
fetchAndPipeHeight, daHeight and the ErrHeightFromFuture check when making this
change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 165f9b63-f6dc-4c33-9286-5bd0d8aeb0ec
📒 Files selected for processing (17)
CHANGELOG.mdapps/evm/cmd/run.goapps/grpc/cmd/run.goapps/testapp/cmd/run.goblock/internal/da/async_block_retriever.goblock/internal/da/forced_inclusion_retriever.goblock/internal/da/forced_inclusion_retriever_test.goblock/internal/da/subscriber.goblock/internal/syncing/da_follower.goblock/internal/syncing/da_retriever.goblock/internal/syncing/syncer.goblock/internal/syncing/syncer_forced_inclusion_test.goblock/public.gopkg/sequencers/based/sequencer.gopkg/sequencers/based/sequencer_test.gopkg/sequencers/single/sequencer.gopkg/sequencers/single/sequencer_test.go
✅ Files skipped from review due to trivial changes (1)
- CHANGELOG.md
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/sequencers/based/sequencer.go
- pkg/sequencers/single/sequencer.go
| switch result.Code { | ||
| case datypes.StatusHeightFromFuture: | ||
| f.logger.Debug(). | ||
| Uint64("height", height). | ||
| Msg("block height not yet available - will retry") | ||
| return | ||
| f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry") | ||
| return datypes.ErrHeightFromFuture | ||
| case datypes.StatusNotFound: | ||
| f.logger.Debug(). | ||
| Uint64("height", height). | ||
| Msg("no blobs at height") | ||
| // Cache empty result to avoid re-fetching | ||
| f.cacheBlock(ctx, height, result.Timestamp, nil) | ||
| case datypes.StatusSuccess: | ||
| // Process each blob | ||
| blobs := make([][]byte, 0, len(result.Data)) | ||
| for _, blob := range result.Data { | ||
| if len(blob) > 0 { | ||
| block.Blobs = append(block.Blobs, blob) | ||
| blobs = append(blobs, blob) | ||
| } | ||
| } | ||
| f.logger.Debug(). | ||
| Uint64("height", height). | ||
| Int("blob_count", len(result.Data)). | ||
| Msg("processed blobs for prefetch") | ||
| f.cacheBlock(ctx, height, result.Timestamp, blobs) | ||
| default: | ||
| f.logger.Debug(). | ||
| Uint64("height", height). | ||
| Str("status", result.Message). | ||
| Msg("failed to retrieve block - will retry") | ||
| return | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Return an error for failed DA fetches.
The default branch logs and returns nil, so Subscriber treats a failed Retrieve as a successful catch-up step and advances localDAHeight. During a DA outage this can skip an unbounded range of heights instead of backing off and retrying the same one.
Proposed fix
default:
f.logger.Debug().
Uint64("height", height).
Str("status", result.Message).
Msg("failed to retrieve block - will retry")
+ return fmt.Errorf("failed to retrieve block at height %d: %s", height, result.Message)
}
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| switch result.Code { | |
| case datypes.StatusHeightFromFuture: | |
| f.logger.Debug(). | |
| Uint64("height", height). | |
| Msg("block height not yet available - will retry") | |
| return | |
| f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry") | |
| return datypes.ErrHeightFromFuture | |
| case datypes.StatusNotFound: | |
| f.logger.Debug(). | |
| Uint64("height", height). | |
| Msg("no blobs at height") | |
| // Cache empty result to avoid re-fetching | |
| f.cacheBlock(ctx, height, result.Timestamp, nil) | |
| case datypes.StatusSuccess: | |
| // Process each blob | |
| blobs := make([][]byte, 0, len(result.Data)) | |
| for _, blob := range result.Data { | |
| if len(blob) > 0 { | |
| block.Blobs = append(block.Blobs, blob) | |
| blobs = append(blobs, blob) | |
| } | |
| } | |
| f.logger.Debug(). | |
| Uint64("height", height). | |
| Int("blob_count", len(result.Data)). | |
| Msg("processed blobs for prefetch") | |
| f.cacheBlock(ctx, height, result.Timestamp, blobs) | |
| default: | |
| f.logger.Debug(). | |
| Uint64("height", height). | |
| Str("status", result.Message). | |
| Msg("failed to retrieve block - will retry") | |
| return | |
| } | |
| return nil | |
| } | |
| switch result.Code { | |
| case datypes.StatusHeightFromFuture: | |
| f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry") | |
| return datypes.ErrHeightFromFuture | |
| case datypes.StatusNotFound: | |
| f.cacheBlock(ctx, height, result.Timestamp, nil) | |
| case datypes.StatusSuccess: | |
| blobs := make([][]byte, 0, len(result.Data)) | |
| for _, blob := range result.Data { | |
| if len(blob) > 0 { | |
| blobs = append(blobs, blob) | |
| } | |
| } | |
| f.cacheBlock(ctx, height, result.Timestamp, blobs) | |
| default: | |
| f.logger.Debug(). | |
| Uint64("height", height). | |
| Str("status", result.Message). | |
| Msg("failed to retrieve block - will retry") | |
| return fmt.Errorf("failed to retrieve block at height %d: %s", height, result.Message) | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/da/async_block_retriever.go` around lines 224 - 245, The
default branch in the switch inside async_block_retriever.go's Retrieve (or the
enclosing method handling DA fetch results) currently only logs and returns nil,
which lets Subscriber advance localDAHeight on failures; change the default case
to return a non-nil error so the caller will back off and retry the same height.
Specifically, in the switch over result.Code (cases
datypes.StatusHeightFromFuture, StatusNotFound, StatusSuccess) update the
default case to construct and return an error that includes the height and
result.Message (or wrap an existing sentinel like datypes.ErrFetchFailed)
instead of returning nil; leave the existing cacheBlock calls unchanged for
StatusNotFound/StatusSuccess and keep returning datypes.ErrHeightFromFuture for
StatusHeightFromFuture.
| s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config.DA.BlockTime.Duration, s.config.Instrumentation.IsTracingEnabled(), s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) | ||
| s.fiRetriever.Start(ctx) |
There was a problem hiding this comment.
Add symmetric shutdown for forced-inclusion retriever.
fiRetriever.Start(ctx) is invoked on Line 185, but Syncer.Stop does not call fiRetriever.Stop(). Please stop it explicitly during shutdown to avoid lifecycle leaks.
🔧 Proposed fix
func (s *Syncer) Stop(ctx context.Context) error {
@@
// Stop the DA follower first (it owns its own goroutines).
if s.daFollower != nil {
s.daFollower.Stop()
}
+ if s.fiRetriever != nil {
+ s.fiRetriever.Stop()
+ }
s.wg.Wait()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer.go` around lines 184 - 185, Syncer starts the
forced-inclusion retriever via s.fiRetriever.Start(ctx) but never stops it;
update Syncer.Stop to call s.fiRetriever.Stop() (or the retriever's appropriate
shutdown method) and ensure it handles nil checks and any blocking waits/errors
(e.g., call s.fiRetriever.Stop(), then wait for completion if Stop returns a
channel/error) so the retriever started in NewForcedInclusionRetriever /
fiRetriever.Start is symmetrically shut down to avoid goroutine/resource leaks.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/evm/go.mod`:
- Around line 5-8: The go.mod in apps/evm currently contains local replace
directives (the replace block referencing github.com/evstack/ev-node and
github.com/evstack/ev-node/execution/evm) which must be removed before rc.5;
delete that entire replace(...) section and update the module's require
statements to reference the published versions (follow release order: core →
root ev-node → execution/evm → apps/evm) so apps/evm's require entries point to
the released semantic versions instead of local paths; ensure there are no other
replace directives remaining in apps/evm/go.mod and run go mod tidy to verify
module graph consistency.
| replace ( | ||
| github.com/evstack/ev-node => ../../ | ||
| github.com/evstack/ev-node/execution/evm => ../../execution/evm | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Search for release scripts or documentation about handling replace directives
# Check for release/build scripts
fd -t f -e sh -e bash -e mk 'release|build|publish' --exec echo "File: {}" \; --exec head -20 {}
# Check for documentation about replace directives or release process
rg -n -C3 'replace.*directive|local.*replace|release.*process' --glob '*.md' --glob 'Makefile'
# Check if other go.mod files in the repo have similar patterns
fd -t f 'go.mod' --exec echo "=== {} ===" \; --exec grep -A5 '^replace' {}Repository: evstack/ev-node
Length of output: 4536
🏁 Script executed:
# Get the specific release process section from RELEASE.md to understand context better
sed -n '180,210p' RELEASE.mdRepository: evstack/ev-node
Length of output: 879
🏁 Script executed:
# Check if RELEASE.md specifies timing for replace directive removal
sed -n '130,190p' RELEASE.mdRepository: evstack/ev-node
Length of output: 2087
Remove local replace directives from go.mod files before rc.5 release.
Per RELEASE.md (lines 186, 404, 417), removing all replace directives from go.mod files is a mandatory step before starting the release process. The local replace directives in apps/evm/go.mod must be removed:
replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)
These should not be present in rc.5 release artifacts, as they reference local filesystem paths that won't exist for consumers. Update the require block to use the published versions instead, following the documented release process order (core → root ev-node → execution/evm → apps/evm).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/evm/go.mod` around lines 5 - 8, The go.mod in apps/evm currently
contains local replace directives (the replace block referencing
github.com/evstack/ev-node and github.com/evstack/ev-node/execution/evm) which
must be removed before rc.5; delete that entire replace(...) section and update
the module's require statements to reference the published versions (follow
release order: core → root ev-node → execution/evm → apps/evm) so apps/evm's
require entries point to the released semantic versions instead of local paths;
ensure there are no other replace directives remaining in apps/evm/go.mod and
run go mod tidy to verify module graph consistency.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
pkg/sequencers/based/sequencer_test.go (1)
79-82:⚠️ Potential issue | 🟠 MajorDefault these startup
Retrievemocks toStatusHeightFromFuture.
NewBasedSequencerstill starts the real forced-inclusion retriever beforeseq.fiRetrieveris swapped. Leaving the helper ondatypes.StatusNotFoundkeeps that background subscriber walking empty heights, and the restart path still has the same window without any neutralRetrievestub onmockDAClient2. Usedatypes.StatusHeightFromFutureconsistently here so these tests do not depend on scheduler timing.As per coding guidelines, "Ensure tests are deterministic".Suggested fix
mockDAClient.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound}, + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, }).Maybe()mockDAClient2.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() +mockDAClient2.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, +}).Maybe() mockExec2 := createDefaultMockExecutor(t) seq2, err := NewBasedSequencer(t.Context(), mockDAClient2, config.DefaultConfig(), db, gen, zerolog.Nop(), mockExec2)Also applies to: 505-509
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sequencers/based/sequencer_test.go` around lines 79 - 82, The test's Retrieve mock on mockDAClient.MockClient returns StatusNotFound which lets NewBasedSequencer's real forced-inclusion retriever walk empty heights; change the mocked Retrieve return to use datypes.StatusHeightFromFuture instead (i.e., return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}}) so the startup retriever is neutral; apply the same change to the other similar mocks (including the mock for mockDAClient2 noted around lines 505-509) and keep the existing Subscribe mock unchanged.
🧹 Nitpick comments (1)
pkg/sequencers/based/sequencer_test.go (1)
79-79: Close the mocked subscription streams on cancellation.These stubs return channels that never close, but
FullDAClient.Subscribenow promises the stream closes when the passedcontext.Contextis cancelled. Returning baremake(chan datypes.SubscriptionEvent)here means the constructor-started subscriber is not exercising the same teardown path as production, which can hide goroutine-leak bugs in these lifecycle tests.As per coding guidelines, "Be mindful of goroutine leaks".
Also applies to: 476-476, 507-507, 553-553, 618-618, 943-943
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sequencers/based/sequencer_test.go` at line 79, The test stubs for mockDAClient.MockClient.On("Subscribe", ...) return bare channels that never close, but FullDAClient.Subscribe promises the returned stream closes when the provided context is cancelled; update each Subscribe stub to create a channel and spawn a goroutine that waits on the provided context (the mock call's first arg) and closes the channel on ctx.Done so the test exercises the same teardown path as production (e.g., change the Subscribe mock in sequencer_test.go to capture the ctx argument, create ch := make(chan datypes.SubscriptionEvent), start go func(){ <-ctx.Done(); close(ch) }() and return ch, nil).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 79-82: The test's Retrieve mock on mockDAClient.MockClient returns
StatusNotFound which lets NewBasedSequencer's real forced-inclusion retriever
walk empty heights; change the mocked Retrieve return to use
datypes.StatusHeightFromFuture instead (i.e., return
datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code:
datypes.StatusHeightFromFuture}}) so the startup retriever is neutral; apply the
same change to the other similar mocks (including the mock for mockDAClient2
noted around lines 505-509) and keep the existing Subscribe mock unchanged.
---
Nitpick comments:
In `@pkg/sequencers/based/sequencer_test.go`:
- Line 79: The test stubs for mockDAClient.MockClient.On("Subscribe", ...)
return bare channels that never close, but FullDAClient.Subscribe promises the
returned stream closes when the provided context is cancelled; update each
Subscribe stub to create a channel and spawn a goroutine that waits on the
provided context (the mock call's first arg) and closes the channel on ctx.Done
so the test exercises the same teardown path as production (e.g., change the
Subscribe mock in sequencer_test.go to capture the ctx argument, create ch :=
make(chan datypes.SubscriptionEvent), start go func(){ <-ctx.Done(); close(ch)
}() and return ch, nil).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f53ad8ce-0317-46fd-a878-8eaea20e557b
📒 Files selected for processing (2)
node/sequencer_recovery_integration_test.gopkg/sequencers/based/sequencer_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (3)
block/internal/syncing/syncer_forced_inclusion_test.go (1)
153-168: Optional: deduplicate FI retriever test wiring.The setup in this test now largely duplicates
newForcedInclusionSyncer(subscription mock + retriever replacement). A small helper for this specific path would reduce drift risk.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/syncer_forced_inclusion_test.go` around lines 153 - 168, Test setup duplicates forced-inclusion wiring: extract a small helper that creates the subscription mock, constructs da.NewForcedInclusionRetriever and injects it into a NewSyncer instance to reduce duplication. Create e.g. newForcedInclusionSyncer(t, client, cfg, gen, st, mockExec, cm) that calls client.On("Subscribe", ...).Return(subCh, nil).Maybe(), creates fiRetriever via da.NewForcedInclusionRetriever(...), constructs the Syncer with NewSyncer(...), stops any existing s.fiRetriever, assigns s.fiRetriever = fiRetriever, and returns the configured *Syncer and subCh for use in tests; replace the duplicated block in the test with this helper.pkg/sequencers/based/sequencer_test.go (2)
501-509: MissingRetrievemock on second DA client.
mockDAClient2hasSubscribemocked (line 507) but lacks aRetrievemock, unlikemockDAClient(lines 477-479). While the brief window beforereplaceWithMockRetrieveris called makes this unlikely to cause issues, adding the mock would ensure consistency and prevent potential flakiness.Suggested fix
mockDAClient2.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() + mockDAClient2.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sequencers/based/sequencer_test.go` around lines 501 - 509, Add a missing Retrieve mock on mockDAClient2 to mirror mockDAClient: on mockDAClient2.MockClient set an expectation for "Retrieve" (with appropriate args e.g., mock.Anything, mock.Anything) and return the same shape used for the first client (e.g., nil, nil) with .Maybe(), so that mockDAClient2's Retrieve is stubbed before NewBasedSequencer or replaceWithMockRetriever runs; target the MockFullDAClient instance mockDAClient2 and the NewBasedSequencer test setup.
79-82: Inconsistent mock default forRetrieveacross test helpers.The
createTestSequencerhelper usesdatypes.StatusNotFound(line 81), while other test setups in this file (e.g., lines 477-479, 554-556, 619-621, 944-946) usedatypes.StatusHeightFromFuture. ThereplaceWithMockRetrievercall at line 92 mitigates the issue by stopping the real retriever, but there's a brief race window betweenNewBasedSequencerstarting the real retriever and the mock replacement.For consistency and to eliminate the race window entirely, consider using
StatusHeightFromFuturehere as well.Suggested fix
mockDAClient.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound}, + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, }).Maybe()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sequencers/based/sequencer_test.go` around lines 79 - 82, The mock for Retrieve in createTestSequencer currently returns datypes.StatusNotFound which is inconsistent with other tests and creates a race before replaceWithMockRetriever swaps in the mock; update the mock setup in the createTestSequencer helper (the mockDAClient.MockClient.On("Retrieve", ...) call used during NewBasedSequencer startup) to return datypes.ResultRetrieve with BaseResult.Code set to datypes.StatusHeightFromFuture so it matches other tests and eliminates the race window before replaceWithMockRetriever runs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@block/internal/syncing/syncer_forced_inclusion_test.go`:
- Around line 153-168: Test setup duplicates forced-inclusion wiring: extract a
small helper that creates the subscription mock, constructs
da.NewForcedInclusionRetriever and injects it into a NewSyncer instance to
reduce duplication. Create e.g. newForcedInclusionSyncer(t, client, cfg, gen,
st, mockExec, cm) that calls client.On("Subscribe", ...).Return(subCh,
nil).Maybe(), creates fiRetriever via da.NewForcedInclusionRetriever(...),
constructs the Syncer with NewSyncer(...), stops any existing s.fiRetriever,
assigns s.fiRetriever = fiRetriever, and returns the configured *Syncer and
subCh for use in tests; replace the duplicated block in the test with this
helper.
In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 501-509: Add a missing Retrieve mock on mockDAClient2 to mirror
mockDAClient: on mockDAClient2.MockClient set an expectation for "Retrieve"
(with appropriate args e.g., mock.Anything, mock.Anything) and return the same
shape used for the first client (e.g., nil, nil) with .Maybe(), so that
mockDAClient2's Retrieve is stubbed before NewBasedSequencer or
replaceWithMockRetriever runs; target the MockFullDAClient instance
mockDAClient2 and the NewBasedSequencer test setup.
- Around line 79-82: The mock for Retrieve in createTestSequencer currently
returns datypes.StatusNotFound which is inconsistent with other tests and
creates a race before replaceWithMockRetriever swaps in the mock; update the
mock setup in the createTestSequencer helper (the
mockDAClient.MockClient.On("Retrieve", ...) call used during NewBasedSequencer
startup) to return datypes.ResultRetrieve with BaseResult.Code set to
datypes.StatusHeightFromFuture so it matches other tests and eliminates the race
window before replaceWithMockRetriever runs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 684ec8e5-a4fc-41a7-8936-e1fb2cc7bbb4
📒 Files selected for processing (2)
block/internal/syncing/syncer_forced_inclusion_test.gopkg/sequencers/based/sequencer_test.go
Resolves #3145
Refactor DA Subscriptions and Follower Logic
Summary
This PR introduces a unified
da.Subscriberprimitive to manage the follow and catch-up lifecycle for DA blobs. Previously, the syncing (DAFollower) and forced inclusion (asyncBlockRetriever) systems duplicated aspects of this logic, leading to subtle bugs spanning error handling, backoff retries, and cache memory leaks.By abstracting the subscription management into a dedicated Subscriber component, we centralize stream merging, height tracking, and fallback handling. Both
DAFollowerandasyncBlockRetrievernow simply implement the SubscriberHandler interface to receive HandleEvent (for inline processing) and HandleCatchup callbacks.Key Changes
da.SubscriberPrimitive: Encapsulates DA subscription logic. Manages an internal followLoop (for inline events) and a catchupLoop (for robust, sequential catch-up when falling behind). Includes support for merging events from multiple DA namespaces.DAFollower&asyncBlockRetriever: Both components now compose ada.Subscriberand implement SubscriberHandler, reducing duplicate boilerplate and abstracting away the complex gap-filling logic.syncer_backoff_test.goand replaced its indirect testing with synchronous unit tests centered aroundda_follower_test.goandda/subscriber_test.go, improving test reliability and execution speed.ErrHeightFromFutureinappropriately triggered backoff retries, and fixed memory leaks associated with stale cache heights in theasyncBlockRetriever.Architecture Overview
flowchart TD DAC[DA Client] -->|Emits SubscriptionEvents| SUB[da.Subscriber] subgraph da [da.Subscriber Component] FL["followLoop<br>Receives live events"] CL["catchupLoop<br>Fills height gaps"] FL -->|Updates highest seen| CL end SUB --> FL FL -->|"HandleEvent (inline processing)"| SH{SubscriberHandler API} CL -->|"HandleCatchup (sequential processing)"| SH ABR["asyncBlockRetriever<br>Forced Inclusions"] -.->|implements| SH DAF["DAFollower<br>Syncing"] -.->|implements| SHSummary by CodeRabbit
New Features
Refactor
Documentation
Tests