Skip to content

feat: Subscribe to forced inclusion ns events#3146

Open
alpe wants to merge 21 commits intomainfrom
alex/2803_best_2worlds_fi_rebased2
Open

feat: Subscribe to forced inclusion ns events#3146
alpe wants to merge 21 commits intomainfrom
alex/2803_best_2worlds_fi_rebased2

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 9, 2026

Resolves #3145

Refactor DA Subscriptions and Follower Logic

Summary

This PR introduces a unified da.Subscriber primitive to manage the follow and catch-up lifecycle for DA blobs. Previously, the syncing (DAFollower) and forced inclusion (asyncBlockRetriever) systems duplicated aspects of this logic, leading to subtle bugs spanning error handling, backoff retries, and cache memory leaks.

By abstracting the subscription management into a dedicated Subscriber component, we centralize stream merging, height tracking, and fallback handling. Both DAFollower and asyncBlockRetriever now simply implement the SubscriberHandler interface to receive HandleEvent (for inline processing) and HandleCatchup callbacks.

Key Changes

  • New da.Subscriber Primitive: Encapsulates DA subscription logic. Manages an internal followLoop (for inline events) and a catchupLoop (for robust, sequential catch-up when falling behind). Includes support for merging events from multiple DA namespaces.
  • SubscriberHandler Interface: Extracted the core consumer logic into an interface with HandleEvent and HandleCatchup methods.
  • Refactored DAFollower & asyncBlockRetriever: Both components now compose a da.Subscriber and implement SubscriberHandler, reducing duplicate boilerplate and abstracting away the complex gap-filling logic.
  • Test Infrastructure Revamp: Deleted syncer_backoff_test.go and replaced its indirect testing with synchronous unit tests centered around da_follower_test.go and da/subscriber_test.go, improving test reliability and execution speed.
  • Bug Fixes: Resolved previous issues where ErrHeightFromFuture inappropriately triggered backoff retries, and fixed memory leaks associated with stale cache heights in the asyncBlockRetriever.

Architecture Overview

flowchart TD
    DAC[DA Client] -->|Emits SubscriptionEvents| SUB[da.Subscriber]
    
    subgraph da [da.Subscriber Component]
        FL["followLoop<br>Receives live events"] 
        CL["catchupLoop<br>Fills height gaps"]
        
        FL -->|Updates highest seen| CL
    end
    
    SUB --> FL
    
    FL -->|"HandleEvent (inline processing)"| SH{SubscriberHandler API}
    CL -->|"HandleCatchup (sequential processing)"| SH
    
    ABR["asyncBlockRetriever<br>Forced Inclusions"] -.->|implements| SH
    DAF["DAFollower<br>Syncing"] -.->|implements| SH
Loading

Summary by CodeRabbit

  • New Features

    • Event-driven DA subscription with follow/catchup flows and optional per-event timestamps
    • Explicit Start/Stop lifecycle for forced-inclusion components
    • Priority-height queuing for prompt handling of peer hints
  • Refactor

    • Replaced polling/callback paths with subscriber + EventSink-driven model
    • Wider context propagation through sequencers and retrievers
  • Documentation

    • Clarified Celestia DA requires websocket (ws://) connection
  • Tests

    • Added and updated subscription-driven unit tests and mocks

@github-actions
Copy link
Contributor

github-actions bot commented Mar 9, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 13, 2026, 4:20 PM

@claude
Copy link
Contributor

claude bot commented Mar 9, 2026

Claude finished @alpe's task in 2m 53s —— View job


Code Review: feat: Subscribe to forced inclusion ns events

  • Gather context and understand the PR
  • Review core da.Subscriber implementation
  • Review DAFollower refactor
  • Review asyncBlockRetriever refactor
  • Review ForcedInclusionRetriever lifecycle
  • Review sequencer integration
  • Review test coverage and quality
  • Post comprehensive review

Overall Assessment

This is a well-architected refactor. Centralizing the follow/catchup lifecycle in da.Subscriber eliminates real duplicated logic, and the SubscriberHandler interface cleanly separates concerns. The CAS-based height coordination between followLoop and catchupLoop is correct, and the seenSubscriptionEvent sentinel cleanly solves the cold-start premature-head problem.

Several bugs and a lifecycle leak warrant attention before merging.


Bugs

1. fetchAndCacheBlock silently swallows errors — subscriber skips heights on DA failure

File: block/internal/da/async_block_retriever.go:238-244

The default: branch of the status switch logs and returns nil. When Retrieve returns an unexpected code (network error, rate-limit, etc.), the subscriber treats it as a success and advances localDAHeight past that height, permanently dropping it.

default:
    f.logger.Debug()...Msg("failed to retrieve block - will retry")
    // returns nil ← subscriber advances past this height!
}
return nil

Fix: return a non-nil error so the subscriber backs off and retries the same height. Fix this →

2. Prefetch ErrHeightFromFuture rolls back a successful primary fetch

File: block/internal/da/async_block_retriever.go:199-211

HandleCatchup fetches the primary height first, then speculatively prefetches ahead. When a prefetch height is beyond DA head, fetchAndCacheBlock returns datypes.ErrHeightFromFuture. This error propagates through HandleCatchup back to runCatchup, which rolls back localDAHeight and backs off — discarding the already-successful primary fetch.

// Speculatively prefetch ahead.
target := daHeight + f.prefetchWindow
for h := daHeight + 1; h <= target; h++ {
    if err := f.fetchAndCacheBlock(ctx, h); err != nil {
        return err  // ← rolls back daHeight's successful fetch
    }
}

The fix is to not propagate ErrHeightFromFuture from the prefetch loop — just break on it instead. Fix this →

3. HandleCatchup double-fetches when priorityHeight == daHeight

File: block/internal/syncing/da_follower.go:125-152

When a P2P hint matches the current sequential DA height, fetchAndPipeHeight is called for it in the priority loop and then called again immediately in the normal sequential fetch:

for priorityHeight := f.popPriorityHeight(); ... {
    if priorityHeight < daHeight { continue }
    f.fetchAndPipeHeight(ctx, priorityHeight) // ← fetches daHeight
    break
}
// 2. Normal sequential fetch
f.fetchAndPipeHeight(ctx, daHeight) // ← fetches daHeight again

This double-pipes the same height's events to the event sink. Fix: break early when priorityHeight == daHeight and skip the normal fetch. Fix this →

4. fiRetriever started in Syncer.Start but never stopped in Syncer.Stop

File: block/internal/syncing/syncer.go:185-186 and 231-277

// Start():
s.fiRetriever.Start(ctx)   // starts background goroutines

// Stop(): no fiRetriever.Stop() call
if s.daFollower != nil {
    s.daFollower.Stop()
}
// fiRetriever goroutines leak

This is the pattern already identified by coderabbitai and confirmed outstanding. It causes goroutine leaks on Syncer.Stop(). Fix this →


Design / Correctness Concerns

5. headReached can read true while catch-up is still pending

File: block/internal/da/subscriber.go:207-209

When an inline event succeeds, headReached is set unconditionally:

if err == nil {
    s.headReached.Store(true)

But if highestSeenDAHeight is already ahead of ev.Height (e.g. merged namespaces arrive out of order, or a reconnect saw a later height first), setting headReached = true here is incorrect — the catchup loop still has work to do.

The condition should be s.localDAHeight.Load() > s.highestSeenDAHeight.Load() (i.e. we've processed past everything we've seen). The same applies to the analogous block around line 281-290 in runCatchup. Fix this →

6. Duplicate namespace subscription in NewDAFollower

File: block/internal/syncing/da_follower.go:52-73

When cfg.DataNamespace is nil or empty, dataNs is set to cfg.Namespace, then both identical slices are passed to da.NewSubscriber. The subscribe() function only deduplicates index i > 0 against index 0 (not between arbitrary indices), so for 2 namespaces this specific case is handled, but it still opens a subscription and then cancels it — wasteful. Better to build the slice conditionally:

namespaces := [][]byte{cfg.Namespace}
if len(cfg.DataNamespace) != 0 && !slices.Equal(cfg.DataNamespace, cfg.Namespace) {
    namespaces = append(namespaces, cfg.DataNamespace)
}

Minor Issues

7. Benchmark test: missing .Load() on atomic comparison

File: block/internal/syncing/syncer_benchmark_test.go:80

// Bug: comparing *atomic.Uint64 pointer to uint64 value
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)

// Should be:
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())

The pointer comparison will always fail (or pass vacuously depending on testify's reflect handling), making this assertion meaningless. Fix this →

8. Intentional error in HandleEvent lacks a comment

File: block/internal/da/async_block_retriever.go:181-183

if isInline {
    return errors.New("async block retriever relies on catchup state machine")
}

Returning an error on success is a surprising pattern. A short comment explaining that this intentionally delegates all ordering to the catchup state machine would help future readers. (coderabbitai also flagged this.)

9. BasedSequencer has no Stop method

File: pkg/sequencers/based/sequencer.go

NewBasedSequencer calls bs.fiRetriever.Start(ctx) (line 102), but BasedSequencer exposes no Stop() to invoke fiRetriever.Stop(). When the enclosing ctx is cancelled the goroutines will eventually exit, but only via context propagation — there's no clean synchronous shutdown. Compare with SingleSequencer, which at least has an explicit stop path. Consider adding Stop() to the public API.


Good Patterns Worth Calling Out

  • The CAS-based localDAHeight coordination between followLoop and catchupLoop is correct and elegantly prevents double-processing.
  • seenSubscriptionEvent sentinel correctly guards against premature headReached on cold start — nice.
  • mergeSubscriptions properly handles both channels closing independently.
  • updateHighest spin-CAS correctly handles concurrent updates without a mutex.
  • The SubscriberHandler interface decoupling is clean; both consumers look much simpler.
  • The watchdog in runSubscription is a solid defensive measure against silent subscription stalls.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 9, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an event-driven DA subscription system: a Subscriber component, backpressure-capable EventSink, timestamp-aware Subscribe signatures, and widespread rewiring of AsyncBlockRetriever, DAFollower, ForcedInclusion retrievers, sequencers and tests to use subscription-driven, context-aware Start lifecycles.

Changes

Cohort / File(s) Summary
Subscriber Core
block/internal/da/subscriber.go, block/internal/da/subscriber_test.go
Adds Subscriber, SubscriberConfig, SubscriberHandler, Start/Stop, follow/catchup loops, merging multi-namespace subscriptions, backoff/watchdog and unit tests for catchup/backoff.
Event Sink Abstraction
block/internal/common/event.go
Introduces EventSink and EventSinkFunc with PipeEvent(ctx, event) error for backpressure-aware piping.
AsyncBlockRetriever
block/internal/da/async_block_retriever.go, block/internal/da/async_block_retriever_test.go
Reworks retriever to subscription-driven model (Start(ctx)), wires Subscriber, adds HandleEvent/HandleCatchup, context-aware fetch/cache and updated tests using explicit daBlockTime durations.
DA Client & Types
block/internal/da/client.go, block/internal/da/interface.go, block/internal/da/tracing.go, block/internal/da/tracing_test.go, pkg/da/types/types.go, pkg/da/jsonrpc/types.go
Extends Subscribe(ctx, namespace, fetchTimestamp bool), populates per-event Timestamp, adds optional Header in subscription response; tracing and tests updated to forward new flag.
Forced Inclusion Retriever
block/internal/da/forced_inclusion_retriever.go, block/internal/da/forced_inclusion_retriever_test.go, block/internal/da/forced_inclusion_tracing.go, block/public.go
Constructor signature changed to accept ctx, daBlockTime time.Duration, tracingEnabled bool; adds Start(ctx) to interface/concrete/tracing wrappers and defers asyncFetcher start to explicit Start.
DAFollower & Syncer Integration
block/internal/syncing/da_follower.go, block/internal/syncing/da_follower_test.go, block/internal/syncing/syncer.go, block/internal/syncing/syncer_test.go, block/internal/syncing/syncer_forced_inclusion_test.go
Migrates DAFollower to use da.Subscriber and EventSink, introduces priority queue API (QueuePriorityHeight), adds HandleEvent/HandleCatchup/fetchAndPipeHeight, and rewires Syncer to act as EventSink and route priority hints to follower; tests updated.
DARetriever & Mocks
block/internal/syncing/da_retriever.go, block/internal/syncing/da_retriever_mock.go, block/internal/syncing/da_retriever_strict_test.go, block/internal/syncing/da_retriever_tracing.go, block/internal/syncing/da_retriever_tracing_test.go
Removes old priority-height API, consolidates blob processing under mutex, switches strictMode from atomic to bool, updates mocks to ProcessBlobs and removes priority mock helpers.
Raft/Retriever Event Sink
block/internal/syncing/raft_retriever.go
Replaces local eventProcessor with EventSink and forwards DAHeightEvent via EventSink.PipeEvent(ctx, event).
Sequencers & Start Propagation
pkg/sequencers/based/sequencer.go, pkg/sequencers/based/sequencer_test.go, pkg/sequencers/single/sequencer.go, pkg/sequencers/single/sequencer_test.go, apps/evm/cmd/run.go, apps/grpc/cmd/run.go
Threads context.Context into sequencer constructors and calls Start(ctx) on ForcedInclusionRetriever; tests and runners updated to pass ctx.
Mocks & Test Utilities
test/mocks/da.go, test/testda/dummy.go, pkg/sequencers/common/forced_inclusion_retriever_mock.go
Adds fetchTimestamp bool to mock Subscribe signatures and test dummy, and adds Start(ctx) mock helpers for ForcedInclusionRetriever.
Benchmarks / Removed Tests / Docs
block/internal/syncing/syncer_backoff_test.go (removed), block/internal/syncing/syncer_benchmark_test.go, docs/guides/da-layers/celestia.md, CHANGELOG.md, apps/evm/go.mod
Removes backoff test file, refactors benchmark to context-driven flow, updates Celestia doc to ws://, updates changelog, and enables replace directives in evm go.mod.
Minor config & test tweaks
node/sequencer_recovery_integration_test.go, other tests
Small timing/context adjustments and test scaffolding changes (e.g., catchup timeout increase, use of t.Context/cancel).

Sequence Diagram(s)

sequenceDiagram
    participant DAClient as DA Client
    participant Subscriber as Subscriber
    participant Follow as FollowLoop
    participant Catchup as CatchupLoop
    participant Handler as SubscriberHandler

    DAClient->>Subscriber: NewSubscriber(cfg) / Start(ctx)
    Subscriber->>Follow: spawn followLoop
    Subscriber->>Catchup: spawn catchupLoop

    alt live event
        DAClient->>Follow: subscription event (height, blobs, timestamp)
        Follow->>Handler: HandleEvent(ctx, event, isInline=true)
        Handler->>Handler: Cache/process inline (PipeEvent)
    end

    alt gap detected
        Subscriber->>Catchup: schedule catchup for missingHeight
        Catchup->>Handler: HandleCatchup(ctx, missingHeight)
        Handler->>DAClient: Retrieve(ctx, missingHeight)
        DAClient->>Handler: block blobs
        Handler->>Handler: Cache and PipeEvent(ctx, event)
    end

    Note over Subscriber,Catchup: HighestSeenDAHeight updates & backoff/reconnect on errors
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested labels

T:testing

Suggested reviewers

  • julienrbrt
  • tac0turtle

Poem

🐰 I hopped through subscriptions, ears aflutter,

Sent timestamps, catchups, and a Subscriber's mutter.
Inline blobs tumbled, catchup gaps did mend,
Mocks and tests aligned, stitched end-to-end.
DA carrots flow steady — hoppity, amend!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 24.21% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'feat: Subscribe to forced inclusion ns events' clearly and concisely summarizes the main feature addition: subscribing to forced inclusion namespace events, which is the primary focus of this substantial refactoring effort.
Description check ✅ Passed The PR description provides excellent context with a summary, key changes, architecture overview with mermaid diagram, and link to issue #3145. All required template sections are well-addressed with sufficient technical detail.
Linked Issues check ✅ Passed The PR fully addresses issue #3145's objectives: it introduces unified da.Subscriber primitive (centralizing DA subscription logic), refactors DAFollower and asyncBlockRetriever to compose the Subscriber, implements SubscriberHandler interface for event/catchup handling, removes syncer_backoff_test.go with synchronous unit tests, and fixes bugs related to ErrHeightFromFuture backoff and cache memory leaks in asyncBlockRetriever.
Out of Scope Changes check ✅ Passed All changes directly support the core objective of unifying DA subscriptions and follower logic. The refactoring of DAFollower, asyncBlockRetriever, and forced inclusion retriever, along with corresponding test updates, are all in-scope. Minor updates to docs/guides and go.mod are necessary supporting changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds_fi_rebased2
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

alpe added 6 commits March 10, 2026 11:57
* main:
  fix: persist snapshot once for avoiding badger vlog (#3153)
  build(deps): Bump docker/setup-buildx-action from 3 to 4 (#3148)
  build(deps): Bump the all-go group across 5 directories with 8 updates (#3147)
  build(deps): Bump docker/login-action from 3 to 4 (#3149)
  build(deps): Bump docker/build-push-action from 6 to 7 (#3151)
  build(deps): Bump actions/setup-go from 6.2.0 to 6.3.0 (#3150)
@codecov
Copy link

codecov bot commented Mar 11, 2026

Codecov Report

❌ Patch coverage is 62.21662% with 150 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.43%. Comparing base (e4e35bf) to head (6552a82).

Files with missing lines Patch % Lines
block/internal/da/subscriber.go 65.69% 48 Missing and 11 partials ⚠️
...quencers/common/forced_inclusion_retriever_mock.go 0.00% 20 Missing ⚠️
block/internal/da/async_block_retriever.go 75.00% 11 Missing and 7 partials ⚠️
block/internal/syncing/da_follower.go 83.82% 8 Missing and 3 partials ⚠️
block/internal/da/client.go 0.00% 9 Missing ⚠️
block/public.go 0.00% 8 Missing ⚠️
block/internal/syncing/da_retriever.go 69.56% 4 Missing and 3 partials ⚠️
block/internal/syncing/syncer.go 33.33% 4 Missing ⚠️
block/internal/da/forced_inclusion_retriever.go 25.00% 2 Missing and 1 partial ⚠️
block/internal/common/event.go 0.00% 2 Missing ⚠️
... and 5 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3146      +/-   ##
==========================================
+ Coverage   60.20%   60.43%   +0.22%     
==========================================
  Files         115      117       +2     
  Lines       11902    11968      +66     
==========================================
+ Hits         7166     7233      +67     
+ Misses       3925     3913      -12     
- Partials      811      822      +11     
Flag Coverage Δ
combined 60.43% <62.21%> (+0.22%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@alpe alpe changed the title [WIP] feat: Subscribe to forced inclusion ns events feat: Subscribe to forced inclusion ns events Mar 11, 2026
@alpe alpe marked this pull request as ready for review March 12, 2026 10:06
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
block/internal/syncing/da_follower.go (1)

53-72: ⚠️ Potential issue | 🟠 Major

Avoid subscribing to the same namespace twice.

DataNamespace is documented as "may be nil or equal to Namespace", but this constructor always passes both entries to da.NewSubscriber. When they match, the follower can subscribe to the same namespace twice, which risks duplicate events and duplicate DA work. Build the slice conditionally and drop the duplicate before constructing the subscriber.

Suggested fix
 	dataNs := cfg.DataNamespace
-	if len(dataNs) == 0 {
-		dataNs = cfg.Namespace
-	}
+	namespaces := [][]byte{cfg.Namespace}
+	if len(dataNs) != 0 && !slices.Equal(dataNs, cfg.Namespace) {
+		namespaces = append(namespaces, dataNs)
+	}
 
 	f := &daFollower{
 		retriever:       cfg.Retriever,
 		eventSink:       cfg.EventSink,
@@
 	f.subscriber = da.NewSubscriber(da.SubscriberConfig{
 		Client:      cfg.Client,
 		Logger:      cfg.Logger,
-		Namespaces:  [][]byte{cfg.Namespace, dataNs},
+		Namespaces:  namespaces,
 		DABlockTime: cfg.DABlockTime,
 		Handler:     f,
 		StartHeight: cfg.StartDAHeight,
 	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 53 - 72, The constructor
currently always passes both cfg.Namespace and dataNs (derived from
cfg.DataNamespace) to da.NewSubscriber which can duplicate the same namespace;
modify the da_follower constructor (around daFollower and f.subscriber creation)
to build the Namespaces slice conditionally: include cfg.Namespace, then only
append dataNs if it is non-empty and not equal to cfg.Namespace (or dedupe by
value), and then pass that deduped [][]byte to da.NewSubscriber via
da.SubscriberConfig so the subscriber never gets the same namespace twice.
🧹 Nitpick comments (4)
block/internal/da/tracing_test.go (1)

25-33: Add a real Subscribe forwarding test.

This mock signature change keeps the file compiling, but nothing in this suite fails if WithTracingClient.Subscribe drops or flips includeTimestamp. A tiny round-trip assertion here would pin the new behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/tracing_test.go` around lines 25 - 33, Add a unit test that
verifies WithTracingClient.Subscribe forwards the includeTimestamp flag
correctly by exercising mockFullClient.Subscribe via its subscribeFn: implement
a mock subscribeFn that captures the received ts parameter and returns a channel
with a dummy datypes.SubscriptionEvent, call WithTracingClient.Subscribe (or the
wrapper under test) with ts=true and ts=false, and assert the captured value
equals the input for both cases; ensure you use mockFullClient.Subscribe to
forward to subscribeFn so the test fails if Subscribe flips or drops
includeTimestamp.
block/public.go (1)

81-82: Clarify the new retriever lifecycle in the public API.

NewForcedInclusionRetriever now accepts a ctx, but the concrete retriever doesn't use that context until a later Start(ctx) call. Please either document that two-step startup here or drop the constructor ctx so callers don't assume construction already wires cancellation.

As per coding guidelines, "Document exported types and functions".

Also applies to: 84-95

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/public.go` around lines 81 - 82, The exported constructor
NewForcedInclusionRetriever currently takes a ctx but the retriever only honors
cancellation when Start(ctx) is called; update the public API so callers aren't
misled by construction-time cancellation — either remove the ctx parameter from
NewForcedInclusionRetriever and update all call sites to pass ctx into
Start(ctx) instead, or add/expand the exported doc comments on
NewForcedInclusionRetriever and the relevant exported retriever type to
explicitly state this is a two-step lifecycle (construction does not start
background work or observe ctx; callers must call Start(ctx) to begin and to
enable cancellation). Reference NewForcedInclusionRetriever and Start(ctx) in
the doc comment so the intended usage is clear.
block/internal/syncing/syncer_forced_inclusion_test.go (1)

77-82: Exercise the started FI retriever path at least once.

After the constructor stopped auto-starting the background subscriber, these tests still never call Start(ctx), so they only cover the synchronous fallback. Please add one case that starts the retriever and drives the subscription path too.

Also applies to: 150-154

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_forced_inclusion_test.go` around lines 77 - 82,
The test never exercises the started ForcedInclusion retriever subscription
path; update the test to call fiRetriever.Start(t.Context()) (after creating
fiRetriever and before t.Cleanup) and drive the subscription by sending a
datypes.SubscriptionEvent into subCh to verify the asynchronous handler path
runs, keeping the existing t.Cleanup(fiRetriever.Stop). Ensure the mock client
expectation for Subscribe (client.On("Subscribe", ...)) remains and that the
event you send matches the handler's expected format so the retriever processes
the subscription case rather than only the synchronous fallback.
block/internal/da/async_block_retriever.go (1)

174-182: Clarify intentional error return in HandleEvent.

The HandleEvent method returns an error when isInline is true, which causes the Subscriber to roll back and delegate processing to the catch-up loop. This is intentional design for the async block retriever use case, but a brief comment explaining this pattern would improve readability.

💡 Suggested clarification
 // HandleEvent caches blobs from the subscription inline, even empty ones,
 // to record that the DA height was seen and has 0 blobs.
+// Returns an error for inline events to delegate sequential processing to the
+// catchup loop, which handles prefetching and ordered retrieval.
 func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error {
 	f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
 	if isInline {
+		// Intentionally reject inline processing to force catchup loop handling
 		return errors.New("async block retriever relies on catchup state machine")
 	}
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/async_block_retriever.go` around lines 174 - 182, The
HandleEvent method on asyncBlockRetriever currently returns an error when
isInline is true to force the Subscriber to roll back and let the catch-up loop
handle inline events; add a brief clarifying comment above the isInline check
(near asyncBlockRetriever.HandleEvent and its call to cacheBlock) explaining
that returning an error is intentional to delegate inline event processing to
the catch-up state machine (Subscriber/catch-up loop) rather than treating it as
a real failure, so future readers understand the control-flow hack.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 128-156: The loop in daFollower.HandleCatchup currently pops
priority hints before fetching, which can permanently drop a hint on transient
fetchAndPipeHeight errors and causes double-fetch when priorityHeight ==
daHeight; change the logic to peek the hint instead of removing it (use the
inverse of popPriorityHeight or re-push it on error), attempt fetchAndPipeHeight
while only removing the hint after a successful fetch, on transient failures
(non-datypes.ErrHeightFromFuture) requeue the same priorityHeight (e.g.
f.pushPriorityHeight(priorityHeight)) and return the error so it will be retried
after backoff, and if priorityHeight == daHeight short-circuit the normal
sequential fetch path so you don't fetch the same height twice. Ensure you still
silently ignore datypes.ErrHeightFromFuture as before.

In `@block/internal/syncing/da_retriever.go`:
- Around line 291-298: The code flips r.strictMode as soon as isValidEnvelope is
true, before running full header checks (ValidateBasic and proposer validation),
so a signed-but-invalid envelope can incorrectly toggle strict mode; change the
logic in the DA retriever so r.strictMode is only set to true after the envelope
passes full header validation (call sites around the current block that
reference isValidEnvelope, header, ValidateBasic, and proposer validation),
mirroring the same guard for the other occurrence (lines ~305-313), and add a
regression test (e.g., "invalid envelope does not toggle strict mode") that
constructs a signed envelope which fails ValidateBasic/proposer checks and
asserts r.strictMode remains false after processing.

In `@block/internal/syncing/syncer_benchmark_test.go`:
- Line 80: The test is comparing the pointer fixt.s.daRetrieverHeight (an
*atomic.Uint64) to a uint64, which compares addresses rather than the stored
value; update the assertion in the test to read the atomic's value via
fixt.s.daRetrieverHeight.Load() and compare that to spec.heights+daHeightOffset
so the actual uint64 contents are asserted (i.e., replace the direct pointer
comparison in the assert.Equal call with a .Load() call).

In `@block/internal/syncing/syncer_test.go`:
- Around line 388-394: The test initializes
syncerInst1.daFollower/syncerInst2.daFollower but then starts and asserts
against local variables follower1/follower2 instead of the syncer's follower;
update the test to assign the created followers back to the syncer instances
(e.g., syncerInst1.daFollower = follower1 and syncerInst2.daFollower =
follower2) and check the Start() errors from both follower1.Start(ctx) and
follower2.Start(ctx) with require.NoError/require.Nil so both starts must
succeed; apply the same fix in the other affected blocks that create follower
instances (the sections around the other noted ranges) so the test uses the
syncer-owned lifecycle throughout.

In `@pkg/sequencers/based/sequencer.go`:
- Line 100: The forced-inclusion retriever is constructed with
block.NewForcedInclusionRetriever but never started; after creating
bs.fiRetriever call its Start(ctx) using a cancellable context (create ctx,
cancel := context.WithCancel(parentCtx) and store or wire cancel for shutdown)
so the retriever's background subscription/prefetch goroutines run; mirror the
pattern used in SingleSequencer by replacing context.Background() with the
cancellable context and invoking bs.fiRetriever.Start(ctx) immediately after
construction.

---

Outside diff comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 53-72: The constructor currently always passes both cfg.Namespace
and dataNs (derived from cfg.DataNamespace) to da.NewSubscriber which can
duplicate the same namespace; modify the da_follower constructor (around
daFollower and f.subscriber creation) to build the Namespaces slice
conditionally: include cfg.Namespace, then only append dataNs if it is non-empty
and not equal to cfg.Namespace (or dedupe by value), and then pass that deduped
[][]byte to da.NewSubscriber via da.SubscriberConfig so the subscriber never
gets the same namespace twice.

---

Nitpick comments:
In `@block/internal/da/async_block_retriever.go`:
- Around line 174-182: The HandleEvent method on asyncBlockRetriever currently
returns an error when isInline is true to force the Subscriber to roll back and
let the catch-up loop handle inline events; add a brief clarifying comment above
the isInline check (near asyncBlockRetriever.HandleEvent and its call to
cacheBlock) explaining that returning an error is intentional to delegate inline
event processing to the catch-up state machine (Subscriber/catch-up loop) rather
than treating it as a real failure, so future readers understand the
control-flow hack.

In `@block/internal/da/tracing_test.go`:
- Around line 25-33: Add a unit test that verifies WithTracingClient.Subscribe
forwards the includeTimestamp flag correctly by exercising
mockFullClient.Subscribe via its subscribeFn: implement a mock subscribeFn that
captures the received ts parameter and returns a channel with a dummy
datypes.SubscriptionEvent, call WithTracingClient.Subscribe (or the wrapper
under test) with ts=true and ts=false, and assert the captured value equals the
input for both cases; ensure you use mockFullClient.Subscribe to forward to
subscribeFn so the test fails if Subscribe flips or drops includeTimestamp.

In `@block/internal/syncing/syncer_forced_inclusion_test.go`:
- Around line 77-82: The test never exercises the started ForcedInclusion
retriever subscription path; update the test to call
fiRetriever.Start(t.Context()) (after creating fiRetriever and before t.Cleanup)
and drive the subscription by sending a datypes.SubscriptionEvent into subCh to
verify the asynchronous handler path runs, keeping the existing
t.Cleanup(fiRetriever.Stop). Ensure the mock client expectation for Subscribe
(client.On("Subscribe", ...)) remains and that the event you send matches the
handler's expected format so the retriever processes the subscription case
rather than only the synchronous fallback.

In `@block/public.go`:
- Around line 81-82: The exported constructor NewForcedInclusionRetriever
currently takes a ctx but the retriever only honors cancellation when Start(ctx)
is called; update the public API so callers aren't misled by construction-time
cancellation — either remove the ctx parameter from NewForcedInclusionRetriever
and update all call sites to pass ctx into Start(ctx) instead, or add/expand the
exported doc comments on NewForcedInclusionRetriever and the relevant exported
retriever type to explicitly state this is a two-step lifecycle (construction
does not start background work or observe ctx; callers must call Start(ctx) to
begin and to enable cancellation). Reference NewForcedInclusionRetriever and
Start(ctx) in the doc comment so the intended usage is clear.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cdba2d5f-0a7b-4edb-a192-a51a19db0f34

📥 Commits

Reviewing files that changed from the base of the PR and between abb6b71 and aca762f.

📒 Files selected for processing (37)
  • apps/evm/server/force_inclusion_test.go
  • block/internal/common/event.go
  • block/internal/da/async_block_retriever.go
  • block/internal/da/async_block_retriever_test.go
  • block/internal/da/client.go
  • block/internal/da/forced_inclusion_retriever.go
  • block/internal/da/forced_inclusion_retriever_test.go
  • block/internal/da/forced_inclusion_tracing.go
  • block/internal/da/interface.go
  • block/internal/da/subscriber.go
  • block/internal/da/subscriber_test.go
  • block/internal/da/tracing.go
  • block/internal/da/tracing_test.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_follower_test.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/da_retriever_mock.go
  • block/internal/syncing/da_retriever_strict_test.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go
  • block/internal/syncing/raft_retriever.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_forced_inclusion_test.go
  • block/internal/syncing/syncer_test.go
  • block/public.go
  • docs/guides/da-layers/celestia.md
  • pkg/da/jsonrpc/types.go
  • pkg/da/types/types.go
  • pkg/sequencers/based/sequencer.go
  • pkg/sequencers/based/sequencer_test.go
  • pkg/sequencers/common/forced_inclusion_retriever_mock.go
  • pkg/sequencers/single/sequencer.go
  • pkg/sequencers/single/sequencer_test.go
  • test/mocks/da.go
  • test/testda/dummy.go
💤 Files with no reviewable changes (3)
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go

Comment on lines +128 to +156
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
// 1. Drain stale or future priority heights from P2P hints
for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
if priorityHeight < daHeight {
continue // skip stale hints without yielding back to the catchup loop
}

// Check for priority heights from P2P hints first.
// We drain stale hints to avoid a tight CPU loop if many are queued.
priorityHeight := f.retriever.PopPriorityHeight()
for priorityHeight > 0 && priorityHeight < f.localNextDAHeight.Load() {
priorityHeight = f.retriever.PopPriorityHeight()
}
f.logger.Debug().
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")

if priorityHeight > 0 {
f.logger.Debug().
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if !f.waitOnCatchupError(ctx, err, priorityHeight) {
return
}
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if errors.Is(err, datypes.ErrHeightFromFuture) {
// Priority hint points to a future height — silently ignore.
f.logger.Debug().Uint64("priority_da_height", priorityHeight).
Msg("priority hint is from future, ignoring")
continue
}
continue
}

// Sequential catchup.
local := f.localNextDAHeight.Load()
highest := f.highestSeenDAHeight.Load()

if highest > 0 && local > highest {
// Caught up.
f.headReached.Store(true)
return
}

// CAS claims this height prevents followLoop from inline-processing
if !f.localNextDAHeight.CompareAndSwap(local, local+1) {
// followLoop already advanced past this height via inline processing.
continue
// Roll back so daHeight is attempted again next cycle after backoff.
return err
}
break // continue with daHeight
}

if err := f.fetchAndPipeHeight(ctx, local); err != nil {
// Roll back so we can retry after backoff.
f.localNextDAHeight.Store(local)
if !f.waitOnCatchupError(ctx, err, local) {
return
}
continue
}
// 2. Normal sequential fetch
if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil {
return err
}
return nil
Copy link
Contributor

@coderabbitai coderabbitai bot Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't drop or double-fetch priority heights in HandleCatchup.

This loop removes the priority hint before the DA fetch. A transient fetchAndPipeHeight failure loses the hint permanently, and priorityHeight == daHeight fetches the same height twice in one pass. Requeue retryable failures and short-circuit the normal path when the priority height matches the current sequential height.

Suggested fix
 func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
 	// 1. Drain stale or future priority heights from P2P hints
 	for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
 		if priorityHeight < daHeight {
 			continue // skip stale hints without yielding back to the catchup loop
 		}
+		if priorityHeight == daHeight {
+			return f.fetchAndPipeHeight(ctx, daHeight)
+		}
 
 		f.logger.Debug().
 			Uint64("da_height", priorityHeight).
 			Msg("fetching priority DA height from P2P hint")
 
 		if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
 			if errors.Is(err, datypes.ErrHeightFromFuture) {
 				// Priority hint points to a future height — silently ignore.
 				f.logger.Debug().Uint64("priority_da_height", priorityHeight).
 					Msg("priority hint is from future, ignoring")
 				continue
 			}
+			f.QueuePriorityHeight(priorityHeight)
 			// Roll back so daHeight is attempted again next cycle after backoff.
 			return err
 		}
 		break // continue with daHeight
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 128 - 156, The loop in
daFollower.HandleCatchup currently pops priority hints before fetching, which
can permanently drop a hint on transient fetchAndPipeHeight errors and causes
double-fetch when priorityHeight == daHeight; change the logic to peek the hint
instead of removing it (use the inverse of popPriorityHeight or re-push it on
error), attempt fetchAndPipeHeight while only removing the hint after a
successful fetch, on transient failures (non-datypes.ErrHeightFromFuture)
requeue the same priorityHeight (e.g. f.pushPriorityHeight(priorityHeight)) and
return the error so it will be retried after backoff, and if priorityHeight ==
daHeight short-circuit the normal sequential fetch path so you don't fetch the
same height twice. Ensure you still silently ignore datypes.ErrHeightFromFuture
as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DA hint is an optimization and comes from an untrusted source. The sequential fetch will cover the hight eventually

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alpe, understood — that's a clear and reasonable design decision. Since the hints are untrusted and purely opportunistic, dropping them on failure is fine, and any missed heights are guaranteed to be covered by the sequential catchup. I'll withdraw this concern.


✏️ Learnings added
Learnt from: alpe
Repo: evstack/ev-node PR: 3146
File: block/internal/syncing/da_follower.go:128-156
Timestamp: 2026-03-12T14:41:22.380Z
Learning: In evstack/ev-node, DA priority heights (queued via `QueuePriorityHeight` in `block/internal/syncing/da_follower.go`) are untrusted, best-effort optimizations sourced from P2P hints. Dropping a hint on a transient fetch failure is intentional — the sequential catchup loop in `da.Subscriber` will cover every height eventually. Do not flag missing retry/re-queue logic for priority hints as a bug.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: CR
Repo: evstack/ev-node PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-26T20:31:15.823Z
Learning: Applies to **/*.go : Implement the DA interface from pkg/da/types when adding a new DA layer

Learnt from: alpe
Repo: evstack/ev-node PR: 3131
File: block/internal/syncing/syncer_backoff_test.go:241-245
Timestamp: 2026-03-06T09:40:36.029Z
Learning: In evstack/ev-node, the module declares go 1.25.6. Since Go 1.22, loop variables are per-iteration by default, so loop variable capture is not a concern. Do not flag or fix loop variable capture in this codebase for any Go files; this rule applies to all Go files in the repository, not just the specific test file.

@@ -75,7 +78,7 @@ func BenchmarkSyncerIO(b *testing.B) {
require.Len(b, fixt.s.heightInCh, 0)

assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing .Load() on atomic value comparison.

fixt.s.daRetrieverHeight is an *atomic.Uint64, but it's being compared directly to a uint64 value. This will compare the pointer, not the stored value, causing the assertion to fail.

🐛 Proposed fix
-				assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
+				assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_benchmark_test.go` at line 80, The test is
comparing the pointer fixt.s.daRetrieverHeight (an *atomic.Uint64) to a uint64,
which compares addresses rather than the stored value; update the assertion in
the test to read the atomic's value via fixt.s.daRetrieverHeight.Load() and
compare that to spec.heights+daHeightOffset so the actual uint64 contents are
asserted (i.e., replace the direct pointer comparison in the assert.Equal call
with a .Load() call).

Comment on lines +388 to +394
syncerInst1.daFollower = NewDAFollower(DAFollowerConfig{
Retriever: daRtrMock,
Logger: zerolog.Nop(),
EventSink: common.EventSinkFunc(syncerInst1.PipeEvent),
Namespace: []byte("ns"),
StartDAHeight: syncerInst1.daRetrieverHeight.Load(),
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use the syncer's follower instance in this persistence test.

syncerInst1/2.daFollower is initialized with one instance, but the test starts and asserts against a different local follower. The restart half also drops the error from follower2.Start(ctx). Assign follower1/2 back to syncerInst*.daFollower and require both Start calls to succeed so this test exercises the actual syncer-owned lifecycle.

Also applies to: 437-448, 501-507, 521-532

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_test.go` around lines 388 - 394, The test
initializes syncerInst1.daFollower/syncerInst2.daFollower but then starts and
asserts against local variables follower1/follower2 instead of the syncer's
follower; update the test to assign the created followers back to the syncer
instances (e.g., syncerInst1.daFollower = follower1 and syncerInst2.daFollower =
follower2) and check the Start() errors from both follower1.Start(ctx) and
follower2.Start(ctx) with require.NoError/require.Nil so both starts must
succeed; apply the same fix in the other affected blocks that create follower
instances (the sections around the other noted ranges) so the test uses the
syncer-owned lifecycle throughout.

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better to have the subscriber centralized and re-used. Could you add a changelog?

utACK

assert.Equal(t, uint64(50), priorityHeight, "valid DA hint should be queued")
}

// TestProcessHeightEvent_SkipsDAHintWhenAlreadyDAIncluded verifies that when the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be re-added. Some conflicts issue i think

}

func TestProcessHeightEvent_SkipsDAHintWhenBelowRetrieverCursor(t *testing.T) {
func TestProcessHeightEvent_SkipsDAHintWhenAlreadyFetched(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test should keep its previous name

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)

125-150: ⚠️ Potential issue | 🟠 Major

Avoid double-fetching when the hint matches daHeight.

If priorityHeight == daHeight, this path fetches the same height once as a priority hint and again in the normal sequential fetch below. That doubles the DA read and can pipe duplicate events downstream.

Proposed fix
 	for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
 		if priorityHeight < daHeight {
 			continue // skip stale hints without yielding back to the catchup loop
 		}
+		if priorityHeight == daHeight {
+			break
+		}

 		f.logger.Debug().
 			Uint64("da_height", priorityHeight).
 			Msg("fetching priority DA height from P2P hint")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 125 - 150, The loop that
drains P2P priority hints can fetch the same DA height twice because a
successful hint fetch (via fetchAndPipeHeight) is followed by the unconditional
normal fetch of daHeight; to fix, track whether the loop successfully handled
the current daHeight and skip the subsequent normal fetch when it did.
Concretely: in the draining loop around popPriorityHeight and fetchAndPipeHeight
set a boolean (e.g., priorityHandled) when fetchAndPipeHeight returns nil for a
priorityHeight equal to daHeight (preserving the existing ErrHeightFromFuture
handling and continue/return behavior), then after the loop only call
fetchAndPipeHeight(ctx, daHeight) if priorityHandled is false; reference the
functions/variables popPriorityHeight, fetchAndPipeHeight, daHeight and the
ErrHeightFromFuture check when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/da/async_block_retriever.go`:
- Around line 224-245: The default branch in the switch inside
async_block_retriever.go's Retrieve (or the enclosing method handling DA fetch
results) currently only logs and returns nil, which lets Subscriber advance
localDAHeight on failures; change the default case to return a non-nil error so
the caller will back off and retry the same height. Specifically, in the switch
over result.Code (cases datypes.StatusHeightFromFuture, StatusNotFound,
StatusSuccess) update the default case to construct and return an error that
includes the height and result.Message (or wrap an existing sentinel like
datypes.ErrFetchFailed) instead of returning nil; leave the existing cacheBlock
calls unchanged for StatusNotFound/StatusSuccess and keep returning
datypes.ErrHeightFromFuture for StatusHeightFromFuture.

In `@block/internal/da/subscriber.go`:
- Around line 195-204: The code currently sets s.headReached true whenever an
inline event is successfully handled, which can be incorrect if a higher DA
height has already been observed; modify the success branch in
subscriber.HandleEvent (the block around updateHighest, localDAHeight, isInline,
err handling) to only set s.headReached.Store(true) when the processed ev.Height
is >= the latest highestSeenDAHeight (read via s.highestSeenDAHeight.Load()),
i.e. after err==nil && isInline do a compare like if ev.Height >=
s.highestSeenDAHeight.Load() { s.headReached.Store(true) } else { leave it false
}, and apply the same conditional change to the analogous success handling in
the other block around lines 281-290 so HasReachedHead() cannot become true
while highestSeenDAHeight is still ahead.

In `@block/internal/syncing/syncer.go`:
- Around line 184-185: Syncer starts the forced-inclusion retriever via
s.fiRetriever.Start(ctx) but never stops it; update Syncer.Stop to call
s.fiRetriever.Stop() (or the retriever's appropriate shutdown method) and ensure
it handles nil checks and any blocking waits/errors (e.g., call
s.fiRetriever.Stop(), then wait for completion if Stop returns a channel/error)
so the retriever started in NewForcedInclusionRetriever / fiRetriever.Start is
symmetrically shut down to avoid goroutine/resource leaks.

In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 79-88: The DA mock returns datypes.StatusNotFound for Retrieve
which lets the real forced-inclusion retriever started by NewBasedSequencer spin
through empty heights; change the default mocked Retrieve return in the test
helper (where mockDAClient.MockClient.On("Subscribe", ...) and .On("Retrieve",
... ) are set) to return datypes.StatusHeightFromFuture instead of
datypes.StatusNotFound so the background retriever does not walk forward
forever—only override to StatusNotFound in tests that explicitly need that
behavior.

---

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 125-150: The loop that drains P2P priority hints can fetch the
same DA height twice because a successful hint fetch (via fetchAndPipeHeight) is
followed by the unconditional normal fetch of daHeight; to fix, track whether
the loop successfully handled the current daHeight and skip the subsequent
normal fetch when it did. Concretely: in the draining loop around
popPriorityHeight and fetchAndPipeHeight set a boolean (e.g., priorityHandled)
when fetchAndPipeHeight returns nil for a priorityHeight equal to daHeight
(preserving the existing ErrHeightFromFuture handling and continue/return
behavior), then after the loop only call fetchAndPipeHeight(ctx, daHeight) if
priorityHandled is false; reference the functions/variables popPriorityHeight,
fetchAndPipeHeight, daHeight and the ErrHeightFromFuture check when making this
change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 165f9b63-f6dc-4c33-9286-5bd0d8aeb0ec

📥 Commits

Reviewing files that changed from the base of the PR and between aca762f and 20bb79d.

📒 Files selected for processing (17)
  • CHANGELOG.md
  • apps/evm/cmd/run.go
  • apps/grpc/cmd/run.go
  • apps/testapp/cmd/run.go
  • block/internal/da/async_block_retriever.go
  • block/internal/da/forced_inclusion_retriever.go
  • block/internal/da/forced_inclusion_retriever_test.go
  • block/internal/da/subscriber.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_forced_inclusion_test.go
  • block/public.go
  • pkg/sequencers/based/sequencer.go
  • pkg/sequencers/based/sequencer_test.go
  • pkg/sequencers/single/sequencer.go
  • pkg/sequencers/single/sequencer_test.go
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/sequencers/based/sequencer.go
  • pkg/sequencers/single/sequencer.go

Comment on lines 224 to +245
switch result.Code {
case datypes.StatusHeightFromFuture:
f.logger.Debug().
Uint64("height", height).
Msg("block height not yet available - will retry")
return
f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry")
return datypes.ErrHeightFromFuture
case datypes.StatusNotFound:
f.logger.Debug().
Uint64("height", height).
Msg("no blobs at height")
// Cache empty result to avoid re-fetching
f.cacheBlock(ctx, height, result.Timestamp, nil)
case datypes.StatusSuccess:
// Process each blob
blobs := make([][]byte, 0, len(result.Data))
for _, blob := range result.Data {
if len(blob) > 0 {
block.Blobs = append(block.Blobs, blob)
blobs = append(blobs, blob)
}
}
f.logger.Debug().
Uint64("height", height).
Int("blob_count", len(result.Data)).
Msg("processed blobs for prefetch")
f.cacheBlock(ctx, height, result.Timestamp, blobs)
default:
f.logger.Debug().
Uint64("height", height).
Str("status", result.Message).
Msg("failed to retrieve block - will retry")
return
}
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Return an error for failed DA fetches.

The default branch logs and returns nil, so Subscriber treats a failed Retrieve as a successful catch-up step and advances localDAHeight. During a DA outage this can skip an unbounded range of heights instead of backing off and retrying the same one.

Proposed fix
 	default:
 		f.logger.Debug().
 			Uint64("height", height).
 			Str("status", result.Message).
 			Msg("failed to retrieve block - will retry")
+		return fmt.Errorf("failed to retrieve block at height %d: %s", height, result.Message)
 	}
 	return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
switch result.Code {
case datypes.StatusHeightFromFuture:
f.logger.Debug().
Uint64("height", height).
Msg("block height not yet available - will retry")
return
f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry")
return datypes.ErrHeightFromFuture
case datypes.StatusNotFound:
f.logger.Debug().
Uint64("height", height).
Msg("no blobs at height")
// Cache empty result to avoid re-fetching
f.cacheBlock(ctx, height, result.Timestamp, nil)
case datypes.StatusSuccess:
// Process each blob
blobs := make([][]byte, 0, len(result.Data))
for _, blob := range result.Data {
if len(blob) > 0 {
block.Blobs = append(block.Blobs, blob)
blobs = append(blobs, blob)
}
}
f.logger.Debug().
Uint64("height", height).
Int("blob_count", len(result.Data)).
Msg("processed blobs for prefetch")
f.cacheBlock(ctx, height, result.Timestamp, blobs)
default:
f.logger.Debug().
Uint64("height", height).
Str("status", result.Message).
Msg("failed to retrieve block - will retry")
return
}
return nil
}
switch result.Code {
case datypes.StatusHeightFromFuture:
f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry")
return datypes.ErrHeightFromFuture
case datypes.StatusNotFound:
f.cacheBlock(ctx, height, result.Timestamp, nil)
case datypes.StatusSuccess:
blobs := make([][]byte, 0, len(result.Data))
for _, blob := range result.Data {
if len(blob) > 0 {
blobs = append(blobs, blob)
}
}
f.cacheBlock(ctx, height, result.Timestamp, blobs)
default:
f.logger.Debug().
Uint64("height", height).
Str("status", result.Message).
Msg("failed to retrieve block - will retry")
return fmt.Errorf("failed to retrieve block at height %d: %s", height, result.Message)
}
return nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/async_block_retriever.go` around lines 224 - 245, The
default branch in the switch inside async_block_retriever.go's Retrieve (or the
enclosing method handling DA fetch results) currently only logs and returns nil,
which lets Subscriber advance localDAHeight on failures; change the default case
to return a non-nil error so the caller will back off and retry the same height.
Specifically, in the switch over result.Code (cases
datypes.StatusHeightFromFuture, StatusNotFound, StatusSuccess) update the
default case to construct and return an error that includes the height and
result.Message (or wrap an existing sentinel like datypes.ErrFetchFailed)
instead of returning nil; leave the existing cacheBlock calls unchanged for
StatusNotFound/StatusSuccess and keep returning datypes.ErrHeightFromFuture for
StatusHeightFromFuture.

Comment on lines +184 to +185
s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config.DA.BlockTime.Duration, s.config.Instrumentation.IsTracingEnabled(), s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion)
s.fiRetriever.Start(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add symmetric shutdown for forced-inclusion retriever.

fiRetriever.Start(ctx) is invoked on Line 185, but Syncer.Stop does not call fiRetriever.Stop(). Please stop it explicitly during shutdown to avoid lifecycle leaks.

🔧 Proposed fix
 func (s *Syncer) Stop(ctx context.Context) error {
@@
 	// Stop the DA follower first (it owns its own goroutines).
 	if s.daFollower != nil {
 		s.daFollower.Stop()
 	}
+	if s.fiRetriever != nil {
+		s.fiRetriever.Stop()
+	}
 
 	s.wg.Wait()
As per coding guidelines: "Be mindful of goroutine leaks".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer.go` around lines 184 - 185, Syncer starts the
forced-inclusion retriever via s.fiRetriever.Start(ctx) but never stops it;
update Syncer.Stop to call s.fiRetriever.Stop() (or the retriever's appropriate
shutdown method) and ensure it handles nil checks and any blocking waits/errors
(e.g., call s.fiRetriever.Stop(), then wait for completion if Stop returns a
channel/error) so the retriever started in NewForcedInclusionRetriever /
fiRetriever.Start is symmetrically shut down to avoid goroutine/resource leaks.

alpe added 2 commits March 13, 2026 09:55
* main:
  build(deps): Bump quinn-proto from 0.11.13 to 0.11.14 in the cargo group across 1 directory (#3155)
  build(deps): bump execution/evm (#3160)
  build(deps): bump ev-node (#3159)
  chore: remove replaces from ev-node (#3158)
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/go.mod`:
- Around line 5-8: The go.mod in apps/evm currently contains local replace
directives (the replace block referencing github.com/evstack/ev-node and
github.com/evstack/ev-node/execution/evm) which must be removed before rc.5;
delete that entire replace(...) section and update the module's require
statements to reference the published versions (follow release order: core →
root ev-node → execution/evm → apps/evm) so apps/evm's require entries point to
the released semantic versions instead of local paths; ensure there are no other
replace directives remaining in apps/evm/go.mod and run go mod tidy to verify
module graph consistency.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: da1a4665-9af7-4ef3-953e-ae512425695a

📥 Commits

Reviewing files that changed from the base of the PR and between 20bb79d and 6ffc4be.

📒 Files selected for processing (1)
  • apps/evm/go.mod

Comment on lines +5 to +8
replace (
github.com/evstack/ev-node => ../../
github.com/evstack/ev-node/execution/evm => ../../execution/evm
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Search for release scripts or documentation about handling replace directives

# Check for release/build scripts
fd -t f -e sh -e bash -e mk 'release|build|publish' --exec echo "File: {}" \; --exec head -20 {}

# Check for documentation about replace directives or release process
rg -n -C3 'replace.*directive|local.*replace|release.*process' --glob '*.md' --glob 'Makefile'

# Check if other go.mod files in the repo have similar patterns
fd -t f 'go.mod' --exec echo "=== {} ===" \; --exec grep -A5 '^replace' {}

Repository: evstack/ev-node

Length of output: 4536


🏁 Script executed:

# Get the specific release process section from RELEASE.md to understand context better
sed -n '180,210p' RELEASE.md

Repository: evstack/ev-node

Length of output: 879


🏁 Script executed:

# Check if RELEASE.md specifies timing for replace directive removal
sed -n '130,190p' RELEASE.md

Repository: evstack/ev-node

Length of output: 2087


Remove local replace directives from go.mod files before rc.5 release.

Per RELEASE.md (lines 186, 404, 417), removing all replace directives from go.mod files is a mandatory step before starting the release process. The local replace directives in apps/evm/go.mod must be removed:

replace (
	github.com/evstack/ev-node => ../../
	github.com/evstack/ev-node/execution/evm => ../../execution/evm
)

These should not be present in rc.5 release artifacts, as they reference local filesystem paths that won't exist for consumers. Update the require block to use the published versions instead, following the documented release process order (core → root ev-node → execution/evm → apps/evm).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/go.mod` around lines 5 - 8, The go.mod in apps/evm currently
contains local replace directives (the replace block referencing
github.com/evstack/ev-node and github.com/evstack/ev-node/execution/evm) which
must be removed before rc.5; delete that entire replace(...) section and update
the module's require statements to reference the published versions (follow
release order: core → root ev-node → execution/evm → apps/evm) so apps/evm's
require entries point to the released semantic versions instead of local paths;
ensure there are no other replace directives remaining in apps/evm/go.mod and
run go mod tidy to verify module graph consistency.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
pkg/sequencers/based/sequencer_test.go (1)

79-82: ⚠️ Potential issue | 🟠 Major

Default these startup Retrieve mocks to StatusHeightFromFuture.

NewBasedSequencer still starts the real forced-inclusion retriever before seq.fiRetriever is swapped. Leaving the helper on datypes.StatusNotFound keeps that background subscriber walking empty heights, and the restart path still has the same window without any neutral Retrieve stub on mockDAClient2. Use datypes.StatusHeightFromFuture consistently here so these tests do not depend on scheduler timing.

Suggested fix
 mockDAClient.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
-	BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound},
+	BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
 }).Maybe()
 mockDAClient2.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
+mockDAClient2.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
+	BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
+}).Maybe()
 mockExec2 := createDefaultMockExecutor(t)
 seq2, err := NewBasedSequencer(t.Context(), mockDAClient2, config.DefaultConfig(), db, gen, zerolog.Nop(), mockExec2)
As per coding guidelines, "Ensure tests are deterministic".

Also applies to: 505-509

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` around lines 79 - 82, The test's
Retrieve mock on mockDAClient.MockClient returns StatusNotFound which lets
NewBasedSequencer's real forced-inclusion retriever walk empty heights; change
the mocked Retrieve return to use datypes.StatusHeightFromFuture instead (i.e.,
return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code:
datypes.StatusHeightFromFuture}}) so the startup retriever is neutral; apply the
same change to the other similar mocks (including the mock for mockDAClient2
noted around lines 505-509) and keep the existing Subscribe mock unchanged.
🧹 Nitpick comments (1)
pkg/sequencers/based/sequencer_test.go (1)

79-79: Close the mocked subscription streams on cancellation.

These stubs return channels that never close, but FullDAClient.Subscribe now promises the stream closes when the passed context.Context is cancelled. Returning bare make(chan datypes.SubscriptionEvent) here means the constructor-started subscriber is not exercising the same teardown path as production, which can hide goroutine-leak bugs in these lifecycle tests.

As per coding guidelines, "Be mindful of goroutine leaks".

Also applies to: 476-476, 507-507, 553-553, 618-618, 943-943

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` at line 79, The test stubs for
mockDAClient.MockClient.On("Subscribe", ...) return bare channels that never
close, but FullDAClient.Subscribe promises the returned stream closes when the
provided context is cancelled; update each Subscribe stub to create a channel
and spawn a goroutine that waits on the provided context (the mock call's first
arg) and closes the channel on ctx.Done so the test exercises the same teardown
path as production (e.g., change the Subscribe mock in sequencer_test.go to
capture the ctx argument, create ch := make(chan datypes.SubscriptionEvent),
start go func(){ <-ctx.Done(); close(ch) }() and return ch, nil).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 79-82: The test's Retrieve mock on mockDAClient.MockClient returns
StatusNotFound which lets NewBasedSequencer's real forced-inclusion retriever
walk empty heights; change the mocked Retrieve return to use
datypes.StatusHeightFromFuture instead (i.e., return
datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code:
datypes.StatusHeightFromFuture}}) so the startup retriever is neutral; apply the
same change to the other similar mocks (including the mock for mockDAClient2
noted around lines 505-509) and keep the existing Subscribe mock unchanged.

---

Nitpick comments:
In `@pkg/sequencers/based/sequencer_test.go`:
- Line 79: The test stubs for mockDAClient.MockClient.On("Subscribe", ...)
return bare channels that never close, but FullDAClient.Subscribe promises the
returned stream closes when the provided context is cancelled; update each
Subscribe stub to create a channel and spawn a goroutine that waits on the
provided context (the mock call's first arg) and closes the channel on ctx.Done
so the test exercises the same teardown path as production (e.g., change the
Subscribe mock in sequencer_test.go to capture the ctx argument, create ch :=
make(chan datypes.SubscriptionEvent), start go func(){ <-ctx.Done(); close(ch)
}() and return ch, nil).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f53ad8ce-0317-46fd-a878-8eaea20e557b

📥 Commits

Reviewing files that changed from the base of the PR and between 6ffc4be and 214a784.

📒 Files selected for processing (2)
  • node/sequencer_recovery_integration_test.go
  • pkg/sequencers/based/sequencer_test.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
block/internal/syncing/syncer_forced_inclusion_test.go (1)

153-168: Optional: deduplicate FI retriever test wiring.

The setup in this test now largely duplicates newForcedInclusionSyncer (subscription mock + retriever replacement). A small helper for this specific path would reduce drift risk.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_forced_inclusion_test.go` around lines 153 -
168, Test setup duplicates forced-inclusion wiring: extract a small helper that
creates the subscription mock, constructs da.NewForcedInclusionRetriever and
injects it into a NewSyncer instance to reduce duplication. Create e.g.
newForcedInclusionSyncer(t, client, cfg, gen, st, mockExec, cm) that calls
client.On("Subscribe", ...).Return(subCh, nil).Maybe(), creates fiRetriever via
da.NewForcedInclusionRetriever(...), constructs the Syncer with NewSyncer(...),
stops any existing s.fiRetriever, assigns s.fiRetriever = fiRetriever, and
returns the configured *Syncer and subCh for use in tests; replace the
duplicated block in the test with this helper.
pkg/sequencers/based/sequencer_test.go (2)

501-509: Missing Retrieve mock on second DA client.

mockDAClient2 has Subscribe mocked (line 507) but lacks a Retrieve mock, unlike mockDAClient (lines 477-479). While the brief window before replaceWithMockRetriever is called makes this unlikely to cause issues, adding the mock would ensure consistency and prevent potential flakiness.

Suggested fix
 	mockDAClient2.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
+	mockDAClient2.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
+		BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
+	}).Maybe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` around lines 501 - 509, Add a missing
Retrieve mock on mockDAClient2 to mirror mockDAClient: on
mockDAClient2.MockClient set an expectation for "Retrieve" (with appropriate
args e.g., mock.Anything, mock.Anything) and return the same shape used for the
first client (e.g., nil, nil) with .Maybe(), so that mockDAClient2's Retrieve is
stubbed before NewBasedSequencer or replaceWithMockRetriever runs; target the
MockFullDAClient instance mockDAClient2 and the NewBasedSequencer test setup.

79-82: Inconsistent mock default for Retrieve across test helpers.

The createTestSequencer helper uses datypes.StatusNotFound (line 81), while other test setups in this file (e.g., lines 477-479, 554-556, 619-621, 944-946) use datypes.StatusHeightFromFuture. The replaceWithMockRetriever call at line 92 mitigates the issue by stopping the real retriever, but there's a brief race window between NewBasedSequencer starting the real retriever and the mock replacement.

For consistency and to eliminate the race window entirely, consider using StatusHeightFromFuture here as well.

Suggested fix
 	mockDAClient.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{
-		BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound},
+		BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
 	}).Maybe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sequencers/based/sequencer_test.go` around lines 79 - 82, The mock for
Retrieve in createTestSequencer currently returns datypes.StatusNotFound which
is inconsistent with other tests and creates a race before
replaceWithMockRetriever swaps in the mock; update the mock setup in the
createTestSequencer helper (the mockDAClient.MockClient.On("Retrieve", ...) call
used during NewBasedSequencer startup) to return datypes.ResultRetrieve with
BaseResult.Code set to datypes.StatusHeightFromFuture so it matches other tests
and eliminates the race window before replaceWithMockRetriever runs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@block/internal/syncing/syncer_forced_inclusion_test.go`:
- Around line 153-168: Test setup duplicates forced-inclusion wiring: extract a
small helper that creates the subscription mock, constructs
da.NewForcedInclusionRetriever and injects it into a NewSyncer instance to
reduce duplication. Create e.g. newForcedInclusionSyncer(t, client, cfg, gen,
st, mockExec, cm) that calls client.On("Subscribe", ...).Return(subCh,
nil).Maybe(), creates fiRetriever via da.NewForcedInclusionRetriever(...),
constructs the Syncer with NewSyncer(...), stops any existing s.fiRetriever,
assigns s.fiRetriever = fiRetriever, and returns the configured *Syncer and
subCh for use in tests; replace the duplicated block in the test with this
helper.

In `@pkg/sequencers/based/sequencer_test.go`:
- Around line 501-509: Add a missing Retrieve mock on mockDAClient2 to mirror
mockDAClient: on mockDAClient2.MockClient set an expectation for "Retrieve"
(with appropriate args e.g., mock.Anything, mock.Anything) and return the same
shape used for the first client (e.g., nil, nil) with .Maybe(), so that
mockDAClient2's Retrieve is stubbed before NewBasedSequencer or
replaceWithMockRetriever runs; target the MockFullDAClient instance
mockDAClient2 and the NewBasedSequencer test setup.
- Around line 79-82: The mock for Retrieve in createTestSequencer currently
returns datypes.StatusNotFound which is inconsistent with other tests and
creates a race before replaceWithMockRetriever swaps in the mock; update the
mock setup in the createTestSequencer helper (the
mockDAClient.MockClient.On("Retrieve", ...) call used during NewBasedSequencer
startup) to return datypes.ResultRetrieve with BaseResult.Code set to
datypes.StatusHeightFromFuture so it matches other tests and eliminates the race
window before replaceWithMockRetriever runs.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 684ec8e5-a4fc-41a7-8936-e1fb2cc7bbb4

📥 Commits

Reviewing files that changed from the base of the PR and between 214a784 and eb65f0f.

📒 Files selected for processing (2)
  • block/internal/syncing/syncer_forced_inclusion_test.go
  • pkg/sequencers/based/sequencer_test.go

alpe added 4 commits March 13, 2026 13:07
* main:
  fix(syncer): include in-flight and pending-cache work in PendingCount (#3162)
  chore: re-add replaces for development (#3161)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants