Conversation
…DA client, and update related dependencies.
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request implements blob compression before submission to reduce bandwidth usage and storage costs. It introduces zstd-based compression infrastructure in a new module, integrates compression into the DA client with dynamic level selection based on batch size, updates dependencies to klauspost/compress v1.18.4, adds a documentation guideline, and includes comprehensive test coverage. Changes
Sequence DiagramsequenceDiagram
participant Client as DA Client
participant Compress as Compression Module
participant Submit as Blob Submission
participant Retrieve as Blob Retrieval
participant Decompress as Decompression Module
Client->>Client: Calculate compression level<br/>(batch size dependent)
Client->>Compress: Compress(blob, level)
Compress-->>Client: Compressed blob + magic prefix
Client->>Client: Validate compressed size<br/>≤ max limit
Client->>Submit: Submit compressed blob
Submit-->>Client: Submission result
Retrieve->>Retrieve: Fetch blob data
Retrieve->>Decompress: Decompress(blob data)
Decompress->>Decompress: Check magic prefix
alt Compressed Data
Decompress->>Decompress: Decompress zstd payload
Decompress-->>Retrieve: Original data
else Uncompressed Data
Decompress-->>Retrieve: Data unchanged
end
Retrieve-->>Client: Retrieved data
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @alpe's task in 2m 5s —— View job Code Review: feat: Implement ZSTD compression for DA blobs
Overall this is a well-thought-out implementation with good security hardening (frame header pre-check, memory cap, decompression timeout). The backward compatibility via magic prefix is clean. A few issues worth addressing before merging.
|
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
Summary of ChangesHello @alpe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request integrates ZSTD compression for Data Availability (DA) blobs, enhancing the efficiency of data handling by reducing the size of blobs transmitted and stored. The changes involve updating the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3104 +/- ##
==========================================
+ Coverage 60.91% 60.96% +0.04%
==========================================
Files 113 114 +1
Lines 11617 11685 +68
==========================================
+ Hits 7077 7124 +47
- Misses 3742 3757 +15
- Partials 798 804 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces ZSTD compression for DA blobs, which is a great feature for reducing bandwidth and storage costs. The implementation is well-contained within a new pkg/da package and includes a comprehensive set of tests. The changes in block/internal/da/client.go correctly integrate the compression and decompression logic. I have a few suggestions to improve the efficiency, readability, and style of the new compression package.
pkg/da/compression.go
Outdated
| compressed := encoder.EncodeAll(data, nil) | ||
|
|
||
| // Prepend magic prefix | ||
| result := make([]byte, len(magic)+len(compressed)) | ||
| copy(result, magic) | ||
| copy(result[len(magic):], compressed) | ||
|
|
||
| return result, nil |
There was a problem hiding this comment.
The current implementation of Compress can be made more memory-efficient. It currently involves at least two allocations. You can reduce this by creating a destination buffer with the magic prefix and then having EncodeAll append the compressed data to it.
| compressed := encoder.EncodeAll(data, nil) | |
| // Prepend magic prefix | |
| result := make([]byte, len(magic)+len(compressed)) | |
| copy(result, magic) | |
| copy(result[len(magic):], compressed) | |
| return result, nil | |
| dst := make([]byte, len(magic)) | |
| copy(dst, magic) | |
| return encoder.EncodeAll(data, dst), nil |
pkg/da/compression.go
Outdated
| // Strip magic prefix and decompress | ||
| decompressed, err := decoder.DecodeAll(data[len(magic):], nil) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("compression: zstd decompress failed: %w", err) |
There was a problem hiding this comment.
The new error handling guideline in CLAUDE.md suggests avoiding "failed to" in error messages. While this message doesn't use that exact phrase, "zstd decompress failed" can be improved to be more direct. The err != nil check already implies failure. Consider rephrasing to state the operation and wrap the underlying error.
| return nil, fmt.Errorf("compression: zstd decompress failed: %w", err) | |
| return nil, fmt.Errorf("compression: zstd decompression: %w", err) |
References
- The project's coding guidelines in
CLAUDE.mdstate to 'Never start an error message with "failed to"'. (link)
| return data[0] == magic[0] && | ||
| data[1] == magic[1] && | ||
| data[2] == magic[2] && | ||
| data[3] == magic[3] |
There was a problem hiding this comment.
The manual byte-by-byte comparison to check for the magic prefix is correct, but using bytes.HasPrefix is more idiomatic and readable. You'll need to add import "bytes" to the file.
| return data[0] == magic[0] && | |
| data[1] == magic[1] && | |
| data[2] == magic[2] && | |
| data[3] == magic[3] | |
| return bytes.HasPrefix(data, magic) |
block/internal/da/client.go
Outdated
| data[i] = b.Data() | ||
| decompressed, decompErr := da.Decompress(b.Data()) | ||
| if decompErr != nil { | ||
| return datypes.ResultRetrieve{ |
There was a problem hiding this comment.
For backward compat, we should still fallback to the uncompressed.
There was a problem hiding this comment.
👍 This is implemented in the Decompress method. It checks for a "magic byte" prefix and returns raw data when not found.
…on based on batch size. (cherry picked from commit 3ff6211ec69d482437de79a617b5b39a88c91835)
|
E2e tests failed. I am not sure how this relates to the changes, yet. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
block/internal/da/client.go (2)
74-79:BlobSizereports uncompressed size — document this or add a compressed-size field.
blobSizeis calculated from the originaldata(pre-compression), but the bytes actually submitted to the DA layer are the compressed blobs. For operators relying onBlobSizefor cost estimation or quota tracking, this mismatch can be misleading. Consider either documenting thatBlobSizeis the uncompressed size, or adding a separate field (e.g.,CompressedBlobSize) so callers can distinguish.Also applies to: 200-210
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/client.go` around lines 74 - 79, The current client.Submit implementation computes blobSize from the original data slices (variable blobSize) which reflects the uncompressed size while DA submissions use compressed blobs; update Submit to either record and return both sizes by adding a new CompressedBlobSize field to the datypes.ResultSubmit (in addition to the existing BlobSize) and populate it with the length of the compressed payloads before sending, or explicitly document in the Submit comment and datatypes that BlobSize is the uncompressed size; change references in Submit and any callers of datypes.ResultSubmit to use the new CompressedBlobSize when estimating costs/quota.
91-100: Consider extracting compression-level thresholds as named constants.The magic numbers
10and3silently encode the backlog-pressure policy. Named constants (or a small config struct) would make the policy self-documenting and easier to tune without reading the switch logic.Example
+const ( + // highBacklogThreshold is the batch-size above which we use fastest compression. + highBacklogThreshold = 10 + // moderateBacklogThreshold is the batch-size above which we use default compression. + moderateBacklogThreshold = 3 +) + compLevel := da.LevelBest switch { - case len(data) > 10: + case len(data) > highBacklogThreshold: compLevel = da.LevelFastest - case len(data) > 3: + case len(data) > moderateBacklogThreshold: compLevel = da.LevelDefault }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/client.go` around lines 91 - 100, Extract the magic numeric thresholds used in the compression selection switch into named constants (e.g., MinFastBatchSize = 10, MinDefaultBatchSize = 3) or into a small config struct, then replace the literal `10` and `3` in the switch that sets `compLevel` (which currently uses `da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with those constants/fields so the backlog-pressure policy is self-documenting and easily tunable without changing control flow.pkg/da/compression_test.go (1)
61-81: Sharedsizesslice across subtests is fragile if parallelized.
sizesis appended insidet.Runcallbacks and then read after the loop. This works today because subtests run sequentially, but addingt.Parallel()in the future would introduce a data race. Consider collecting sizes outside the subtest or adding a brief comment warning against parallelizing these subtests.One alternative: move the cross-level assertion into its own subtest
- var sizes []int for _, lvl := range levels { + lvl := lvl t.Run(lvl.name, func(t *testing.T) { compressed, err := Compress(data, lvl.level) require.NoError(t, err) assert.True(t, IsCompressed(compressed)) - sizes = append(sizes, len(compressed)) - decompressed, err := Decompress(compressed) require.NoError(t, err) assert.Equal(t, data, decompressed) t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data))) }) } - // Best should produce equal or smaller output than Fastest - if len(sizes) == 3 { - assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest") - } + t.Run("best_leq_fastest", func(t *testing.T) { + fastest, err := Compress(data, LevelFastest) + require.NoError(t, err) + best, err := Compress(data, LevelBest) + require.NoError(t, err) + assert.LessOrEqual(t, len(best), len(fastest), + "LevelBest should compress at least as well as LevelFastest") + })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/da/compression_test.go` around lines 61 - 81, The test currently appends to a shared sizes slice inside each t.Run which is unsafe if subtests are parallelized; change to preallocate sizes := make([]int, len(levels)) before the loop, capture the loop index (e.g., idx := i) inside the t.Run closure, and assign sizes[idx] = len(compressed) instead of appending; keep the cross-level assertion after the loop (or move it into its own t.Run) and add a short comment on not calling t.Parallel() here to avoid races if someone enables parallel subtests; refer to the t.Run loop, levels, Compress and Decompress calls and the sizes slice when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: The decompression-error branch in Retrieve (where
da.Decompress returns decompErr) returns a ResultRetrieve without setting
BaseResult.Timestamp; update that error return to set Timestamp to the existing
block time (use the blockTime variable if available, otherwise time.Now()) so
the ResultRetrieve mirrors other error branches; modify the return constructing
datypes.ResultRetrieve / datypes.BaseResult in that decompErr branch to include
Timestamp: blockTime (or time.Now()) to ensure non-zero timestamps for callers.
- Around line 113-118: The log computes a compression ratio using
float64(len(compressed))/float64(len(raw)) which yields NaN for zero-length raw;
fix by computing the ratio before logging and guarding when len(raw) == 0 (e.g.,
set ratio := 0.0 or omit the field), then call
c.logger.Debug().Int("original_size", len(raw)).Int("compressed_size",
len(compressed)).Float64("ratio", ratio).Int("level",
int(compLevel)).Msg("compressed blob for DA submission"); this ensures no
division by zero produces NaN in the logs while keeping the same c.logger.Debug
call and the raw/compressed/compLevel identifiers.
In `@pkg/da/compression.go`:
- Around line 47-51: The zstd decoder is created without a max-memory cap
(decoder, err = zstd.NewReader(nil)), which risks OOM on untrusted DA blobs;
update the decoder initialization in pkg/da/compression.go to pass
zstd.WithDecoderMaxMemory(...) (e.g., 7*1024*1024 for a 7MB cap matching the DA
max blob size) so the decoder enforces a safe decompression memory limit.
---
Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 74-79: The current client.Submit implementation computes blobSize
from the original data slices (variable blobSize) which reflects the
uncompressed size while DA submissions use compressed blobs; update Submit to
either record and return both sizes by adding a new CompressedBlobSize field to
the datypes.ResultSubmit (in addition to the existing BlobSize) and populate it
with the length of the compressed payloads before sending, or explicitly
document in the Submit comment and datatypes that BlobSize is the uncompressed
size; change references in Submit and any callers of datypes.ResultSubmit to use
the new CompressedBlobSize when estimating costs/quota.
- Around line 91-100: Extract the magic numeric thresholds used in the
compression selection switch into named constants (e.g., MinFastBatchSize = 10,
MinDefaultBatchSize = 3) or into a small config struct, then replace the literal
`10` and `3` in the switch that sets `compLevel` (which currently uses
`da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with those
constants/fields so the backlog-pressure policy is self-documenting and easily
tunable without changing control flow.
In `@pkg/da/compression_test.go`:
- Around line 61-81: The test currently appends to a shared sizes slice inside
each t.Run which is unsafe if subtests are parallelized; change to preallocate
sizes := make([]int, len(levels)) before the loop, capture the loop index (e.g.,
idx := i) inside the t.Run closure, and assign sizes[idx] = len(compressed)
instead of appending; keep the cross-level assertion after the loop (or move it
into its own t.Run) and add a short comment on not calling t.Parallel() here to
avoid races if someone enables parallel subtests; refer to the t.Run loop,
levels, Compress and Decompress calls and the sizes slice when making the
change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (7)
apps/evm/go.sumis excluded by!**/*.sumapps/grpc/go.sumis excluded by!**/*.sumapps/testapp/go.sumis excluded by!**/*.sumexecution/evm/go.sumis excluded by!**/*.sumexecution/evm/test/go.sumis excluded by!**/*.sumgo.sumis excluded by!**/*.sumtest/e2e/go.sumis excluded by!**/*.sum
📒 Files selected for processing (10)
CLAUDE.mdapps/evm/go.modapps/grpc/go.modapps/testapp/go.modblock/internal/da/client.goexecution/evm/test/go.modgo.modpkg/da/compression.gopkg/da/compression_test.gotest/e2e/go.mod
| var err error | ||
| decoder, err = zstd.NewReader(nil) | ||
| if err != nil { | ||
| panic(fmt.Sprintf("compression: create zstd decoder: %v", err)) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n pkg/da/compression.goRepository: evstack/ev-node
Length of output: 3511
🌐 Web query:
klauspost compress zstd WithDecoderMaxMemory WithDecoderMaxWindowSize options
💡 Result:
In github.com/klauspost/compress/zstd, the current option names are:
-
WithDecoderMaxMemory(n uint64)
Sets an upper bound on memory-related growth:- For non-streaming decode (
DecodeAll): the maximum decoded size allowed. - For streaming decode (
NewReader/Read): the maximum window size allowed.
Default: 64 GiB. Max: 1<<63 bytes. Can be changed viaResetWithOptions. [1]
- For non-streaming decode (
-
WithDecoderMaxWindow(size uint64)(this is what you likely mean by “WithDecoderMaxWindowSize”)
Sets the maximum Zstd window size the decoder will accept (larger windows imply higher memory usage).
Default: 512 MB. Max: ~3.75 TB (per Zstandard spec). IfWithDecoderMaxMemoryis set lower, that lower value wins. Can be changed viaResetWithOptions. [1]
Example:
dec, err := zstd.NewReader(r,
zstd.WithDecoderMaxMemory(512<<20), // 512 MiB cap (DecodeAll output cap / streaming window cap)
zstd.WithDecoderMaxWindow(256<<20), // additionally cap window to 256 MiB
)Source: Go package docs for github.com/klauspost/compress/zstd. [1]
References:
[1] https://pkg.go.dev/github.com/klauspost/compress/zstd
🏁 Script executed:
# Search for any blob size limits or DA constraints in the codebase
rg -i "blob.*size|max.*blob|max.*decompress|decodermax" --type goRepository: evstack/ev-node
Length of output: 16141
🏁 Script executed:
# Look for DA interface and max blob references
fd "types.go" pkg/da --exec cat -n {}Repository: evstack/ev-node
Length of output: 3573
🏁 Script executed:
# Search for usage of Decompress function
rg "Decompress" --type go -B 2 -A 2Repository: evstack/ev-node
Length of output: 4883
Add decompression size limit to prevent memory exhaustion from untrusted DA blobs.
The Decompress function processes untrusted data from the DA layer without constraining decompression output. With the default zstd limit of 64 GiB, a maliciously crafted compressed blob could allocate far more memory than the actual maximum blob size (5 MB). Add zstd.WithDecoderMaxMemory() when initializing the decoder at line 48, aligned with your maximum DA blob size to prevent DoS attacks.
decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(7*1024*1024)) // 7 MB to match max blob size🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/da/compression.go` around lines 47 - 51, The zstd decoder is created
without a max-memory cap (decoder, err = zstd.NewReader(nil)), which risks OOM
on untrusted DA blobs; update the decoder initialization in
pkg/da/compression.go to pass zstd.WithDecoderMaxMemory(...) (e.g., 7*1024*1024
for a 7MB cap matching the DA max blob size) so the decoder enforces a safe
decompression memory limit.
Resolves #2532
Overview
Ref:
Benchmarks Zstd
ethereum-optimism/optimistic-specs#174
Arbitrum uses Brotli with dynamic compaction levels: Level 11 for low traffic; Level 0-1 for spikes.
https://docs.arbitrum.io/how-arbitrum-works/inside-arbitrum-nitro#:~:text=The%20data%20then%20undergoes%20compression,toward%20speed%20during%20heavy%20backlogs.
Summary by CodeRabbit
Release Notes
New Features
Documentation
Chores