Skip to content

fix(da): fix polling fallback when ws not available#3361

Merged
julienrbrt merged 5 commits into
mainfrom
julien/fix-polling-fallback
Jun 20, 2026
Merged

fix(da): fix polling fallback when ws not available#3361
julienrbrt merged 5 commits into
mainfrom
julien/fix-polling-fallback

Conversation

@julienrbrt

@julienrbrt julienrbrt commented Jun 19, 2026

Copy link
Copy Markdown
Member

Overview

#3146 uses subscribe (websocket) only, and the fallback to http in #3211, just changes the transport but we still use subscribe. when switching back to http we need to keep using getall/get instead.

Summary by CodeRabbit

  • Bug Fixes
    • Improved DA client resilience: if WebSocket fails, the service falls back to HTTP polling instead of getting stuck or permanently downgrading.
    • Added automatic WebSocket restoration: a background retry attempts to re-establish WebSocket every 30 seconds after an outage.
  • New Features
    • Runtime detection of DA subscription support via a new SupportsSubscribe() capability, enabling syncing to use subscription-based updates when available and switch to polling otherwise.

@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 26f69aef-892f-4f6c-871a-1d3f0ec15e24

📥 Commits

Reviewing files that changed from the base of the PR and between 9fc827b and 1deca93.

📒 Files selected for processing (1)
  • pkg/da/jsonrpc/client.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/da/jsonrpc/client.go

📝 Walkthrough

Walkthrough

Adds SupportsSubscribe() bool to the DA Client interface, implements it in client, tracedClient, and test mocks. The JSON-RPC client gains an eager WS attempt with HTTP fallback and a background goroutine that retries WS every 30 seconds and upgrades in-place. Subscriber.Start now branches between followLoop (WS) and a new pollLoop (HTTP), which itself upgrades to followLoop once WS becomes available.

Changes

DA WebSocket Fallback and Polling

Layer / File(s) Summary
DA Client interface and block-level client wiring
block/internal/da/interface.go, block/internal/da/client.go, block/internal/da/tracing.go, block/internal/da/tracing_test.go
SupportsSubscribe() added to the Client interface; internal client retains a *blobrpc.Client pointer to read its live IsWebSocket flag; tracedClient delegates to its inner client; tracing test mock returns true.
JSON-RPC client WS resiliency with HTTP fallback and retry
pkg/da/jsonrpc/client.go
Client gains IsWebSocket, mu, closer, and retryCancel fields. Close() stops any running retry loop. NewWSClient falls back to HTTP on WS dial failure and spawns a background goroutine that calls tryUpgradeWS every 30 seconds to swap transport internals in-place when WS becomes reachable.
Subscriber.Start branching and pollLoop
block/internal/da/subscriber.go
Start routes to followLoop or pollLoop based on SupportsSubscribe(). pollLoop polls via pollDAHeight on a ticker, triggers updateHighest on new heights, and self-upgrades to followLoop when WS becomes available.
Mock implementations and test helper updates
test/mocks/da.go, test/testda/dummy.go, block/internal/da/async_block_retriever_test.go, block/internal/syncing/syncer_test.go, apps/evm/server/force_inclusion_test.go, pkg/sequencers/single/sequencer_test.go
MockClient gains full testify scaffolding for SupportsSubscribe(). DummyDA.SupportsSubscribe() returns true. All affected tests stub SupportsSubscribe() to return true before existing subscription expectations across async block retriever, syncer, force inclusion, and sequencer test suites.
Changelog entry
CHANGELOG.md
v1.1.4 Fixed entry documents the WS-to-HTTP polling fallback and 30-second background WS retry.

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant NewWSClient
  participant retryWSLoop
  participant tryUpgradeWS
  participant Subscriber

  Caller->>NewWSClient: NewWSClient(addr)
  alt WS dial succeeds
    NewWSClient-->>Caller: Client{IsWebSocket:true}
    Caller->>Subscriber: Start()
    Subscriber->>Subscriber: followLoop (WS subscription)
  else WS dial fails
    NewWSClient->>retryWSLoop: start background retry goroutine
    NewWSClient-->>Caller: Client{IsWebSocket:false}
    Caller->>Subscriber: Start()
    Subscriber->>Subscriber: pollLoop (HTTP polling)
    loop every 30s
      retryWSLoop->>tryUpgradeWS: attempt WS upgrade
      tryUpgradeWS-->>retryWSLoop: IsWebSocket=true
    end
    Subscriber->>Subscriber: SupportsSubscribe() now true
    Subscriber->>Subscriber: spawn followLoop, exit pollLoop
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • evstack/ev-node#3131: Both PRs touch the WebSocket/subscription path by modifying pkg/da/jsonrpc/client.go, specifically the NewWSClient construction/behavior for WebSocket support.
  • evstack/ev-node#3146: The main PR's block/internal/da/subscriber.go changes build directly on this PR's new da.Subscriber follow/catchup architecture.
  • evstack/ev-node#3211: Both PRs modify pkg/da/jsonrpc/client.go's NewWSClient behavior to fall back to HTTP polling when WebSocket connection fails.

Suggested labels

T:testing

Suggested reviewers

  • alpe
  • tac0turtle

Poem

🐇 The WebSocket dropped, oh what a fright,
But polling via HTTP kept data in sight.
Every thirty seconds I retry the WS door,
Swap the internals in-place — then subscribe once more!
No transient outage shall linger for long,
This little rabbit hops back, ever strong. 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: implementing polling fallback when WebSocket is unavailable, which is the core purpose of the changeset.
Description check ✅ Passed The description provides clear context about the problem (HTTP fallback was still using subscribe instead of switching to polling), references related PRs, and explains the solution.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch julien/fix-polling-fallback

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude

claude Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Claude finished @julienrbrt's task in 2m 50s —— View job


Code Review: fix(da): fix polling fallback when ws not available

  • Read AGENTS.md and repo guidelines
  • Read changed files
  • Review concurrency correctness
  • Review error handling & resilience
  • Review testing coverage
  • Post final review

The PR correctly addresses the core bug — falling back to HTTP polling instead of staying stuck on a non-functional WebSocket subscribe. The overall architecture (poll loop → follow loop upgrade, background retry) is sound. There are two concurrency issues that need attention before merging.

Critical: Blob.Internal / Header.Internal swap is not race-safe

In pkg/da/jsonrpc/client.go, tryUpgradeWS swaps the entire Internal struct:

// pkg/da/jsonrpc/client.go:158-159
c.Blob.Internal = wsClient.Blob.Internal
c.Header.Internal = wsClient.Header.Internal

This write is protected by c.mu, but all callers in block/internal/da/client.go read these fields without holding c.mu:

// block/internal/da/client.go:346 (in Retrieve, called concurrently)
blobs, err := c.blobAPI.GetAll(blobCtx, height, ...)

c.blobAPI is &cfg.DA.Blob — a pointer to the very struct being overwritten. Since Internal contains multiple function-pointer fields and Go structs are not written atomically, a concurrent RPC caller can observe a partially-written Internal struct. This is a real data race (confirmed by go test -race) that the atomic.Bool fix for IsWebSocket does not address.

Fix: Use pointer indirection and atomic.Pointer to swap the whole transport atomically, or protect every RPC dispatch path with c.mu. The atomic approach is simpler:

type Client struct {
    blob   atomic.Pointer[BlobAPI]   // replaces Blob
    header atomic.Pointer[HeaderAPI] // replaces Header
    IsWebSocket atomic.Bool
    ...
}
// swap in tryUpgradeWS
c.blob.Store(&wsClient.blob.Load())
c.header.Store(&wsClient.header.Load())

Fix this →


Major: WS connection leak when Close() races with tryUpgradeWS

Close() does not set a closed flag before releasing c.mu:

// pkg/da/jsonrpc/client.go:34-44
c.mu.Lock()
if c.retryCancel != nil { c.retryCancel(); c.retryCancel = nil }
closer := c.closer   // snapshots the *old* HTTP closer
c.mu.Unlock()
if closer != nil { closer() }

If tryUpgradeWS has already dialled wsClient but not yet acquired c.mu, this sequence occurs:

  1. Close() captures the old HTTP closer, releases c.mu, calls old closer
  2. tryUpgradeWS acquires c.mu, swaps internals, replaces c.closer with a closure that includes wsClient.closer()
  3. No one ever calls the new c.closer → WS connection leaked

Add a closed bool field guarded by c.mu and check it early in tryUpgradeWS:

c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
    wsClient.Close()
    return true
}

And in Close(): set c.closed = true before releasing the lock.

Fix this →


Major: Test coverage for new concurrency paths is ~9%

Codecov reports 86 lines missing coverage, concentrated in pkg/da/jsonrpc/client.go (41+2 missing) and block/internal/da/subscriber.go (38+1 missing). The untested paths are exactly the ones with the most complex concurrent behaviour:

  • retryWSLoop / tryUpgradeWS
  • pollLoop, including the upgrade-to-followLoop branch
  • pollDAHeight

Given that this code runs a background goroutine performing in-place transport mutation, tests that exercise the WS→HTTP→WS round-trip and poll→follow transition are important both for correctness verification and for catching regressions. Running go test -race on these new paths should be a baseline before merge.


Minor: seenSubscriptionEvent flag name is misleading in the poll path

pollDAHeight sets s.seenSubscriptionEvent.Store(true) despite there being no subscription event:

// block/internal/da/subscriber.go:231
s.seenSubscriptionEvent.Store(true)

This flag gates headReached in runCatchup, so the semantics are correct — but the name causes confusion. Consider renaming to seenHeightProgress or hasObservedHeight to reflect that both code paths set it.


Minor: Hardcoded 30-second WS retry interval

wsRetryInterval = 30 * time.Second is unexported and not configurable. For operators with fast DA endpoints this means a 30-second blind spot after a transient WS failure. Exposing this via config or at minimum via an option parameter to NewWSClient would improve operational flexibility.


Good

  • atomic.Bool for IsWebSocket is the right call — clean and lock-free on the read path.
  • pollLoop calling wg.Add(1) before spawning followLoop and then returning is correctly sequenced — Stop() / Wait() will not return prematurely.
  • tryUpgradeWS guarding against double-upgrade via c.IsWebSocket.Load() check under the mutex is correct.
  • The pollDAHeight function is cleanly factored and handles context cancellation before logging.
  • Tracing and mock delegation for SupportsSubscribe() are minimal and correct.

@github-actions

github-actions Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed⏩ skippedJun 20, 2026, 9:12 PM

@julienrbrt julienrbrt marked this pull request as ready for review June 19, 2026 11:24
@julienrbrt julienrbrt force-pushed the julien/fix-polling-fallback branch from e81f6c9 to 4e8d3ba Compare June 19, 2026 11:27

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@block/internal/da/client.go`:
- Around line 493-494: The SupportsSubscribe() method in
block/internal/da/client.go reads the IsWebSocket field from the da client
without synchronization, while the tryUpgradeWS() method in
pkg/da/jsonrpc/client.go writes to IsWebSocket under mutex protection. This
creates a data race as the background retryWSLoop goroutine calls
tryUpgradeWS(). To fix this, add a thread-safe accessor method
SupportsSubscribe() in the Client type in pkg/da/jsonrpc/client.go that acquires
the mu mutex, reads IsWebSocket, and returns the value, then update the
SupportsSubscribe() method in block/internal/da/client.go to call this new
accessor method instead of directly accessing the IsWebSocket field.

In `@pkg/da/jsonrpc/client.go`:
- Around line 17-25: Replace the `IsWebSocket` bool field in the Client struct
with `atomic.Bool` to eliminate the data race. Then update the write site in the
`tryUpgradeWS` function where `IsWebSocket` is assigned (around line 168) to use
the atomic.Store method instead of direct assignment. Finally, update the read
site in `SupportsSubscribe()` function in `block/internal/da/client.go` (lines
493-495) to use the atomic.Load method instead of reading the field directly,
which ensures race-safe access without requiring mutex synchronization.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2fc19a96-2370-4ed8-8a01-7c52b64e82aa

📥 Commits

Reviewing files that changed from the base of the PR and between ec9f9bf and 2b5828d.

📒 Files selected for processing (12)
  • CHANGELOG.md
  • apps/evm/server/force_inclusion_test.go
  • block/internal/da/async_block_retriever_test.go
  • block/internal/da/client.go
  • block/internal/da/interface.go
  • block/internal/da/subscriber.go
  • block/internal/da/tracing.go
  • block/internal/da/tracing_test.go
  • block/internal/syncing/syncer_test.go
  • pkg/da/jsonrpc/client.go
  • test/mocks/da.go
  • test/testda/dummy.go

Comment thread block/internal/da/client.go Outdated
Comment thread pkg/da/jsonrpc/client.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pkg/da/jsonrpc/client.go (2)

157-170: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Avoid swapping RPC internals while callers can read them concurrently.

c.mu only protects this writer; polling/catchup callers can still use c.Blob or c.Header while the retry goroutine replaces their Internal fields. Use an immutable transport snapshot behind a synchronized/atomic indirection, or route every RPC call through the same lock before publishing IsWebSocket=true.

As per coding guidelines, be careful with concurrent access to shared state in Go code.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/da/jsonrpc/client.go` around lines 157 - 170, The code swaps the Internal
fields of c.Blob and c.Header without protecting concurrent readers who may be
using these fields for RPC calls simultaneously. While c.mu protects this write
operation in the retry goroutine, polling and catchup callers can still access
c.Blob.Internal and c.Header.Internal concurrently, causing a race condition. To
fix this, either implement an immutable transport snapshot pattern where the
entire transport is swapped atomically behind a synchronized indirection (like a
pointer to a struct containing Blob and Header), or ensure all RPC callers route
their calls through c.mu before the swap happens and only publish
IsWebSocket=true after the swap is protected by the same lock that guards all
RPC read operations.

Source: Coding guidelines


23-25: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Prevent WS upgrades after Close has started.

Close cancels the retry loop but does not mark the client closed. If tryUpgradeWS already created wsClient, it can acquire c.mu after Close captures the old closer and install a new WS transport that Close will never close.

Suggested fix
 type Client struct {
 	Blob        BlobAPI
 	Header      HeaderAPI
 	IsWebSocket atomic.Bool
 
 	mu          sync.Mutex
 	closer      jsonrpc.ClientCloser
 	retryCancel context.CancelFunc // stops the background WS retry loop
+	closed      bool
 }
 
 func (c *Client) Close() {
 	if c == nil {
 		return
 	}
 	c.mu.Lock()
 	if c.retryCancel != nil {
 		c.retryCancel()
 		c.retryCancel = nil
 	}
 	closer := c.closer
+	c.closer = nil
+	c.closed = true
 	c.mu.Unlock()
 	if closer != nil {
 		closer()
 	}
 }
 	c.mu.Lock()
 	defer c.mu.Unlock()
 
+	if c.closed {
+		wsClient.Close()
+		return true
+	}
+
 	// Another goroutine may have already upgraded.
 	if c.IsWebSocket.Load() {
 		wsClient.Close()
 		return true
 	}

As per coding guidelines, be mindful of goroutine leaks in Go code.

Also applies to: 34-42, 148-170

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/da/jsonrpc/client.go` around lines 23 - 25, The client struct needs a
closed flag to track whether Close has been called, preventing race conditions
in goroutine management. Add a boolean field (like closed or isClosed) to the
struct alongside mu, closer, and retryCancel. In the Close method, acquire c.mu,
set this flag to true, then proceed with canceling the retry loop and closing
the existing transport. In the tryUpgradeWS method, check this closed flag while
holding c.mu at the beginning and return early if the client is already closed,
ensuring no new WS transports are installed after Close has started. This
prevents goroutine leaks from WS clients that would never be closed.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@pkg/da/jsonrpc/client.go`:
- Around line 157-170: The code swaps the Internal fields of c.Blob and c.Header
without protecting concurrent readers who may be using these fields for RPC
calls simultaneously. While c.mu protects this write operation in the retry
goroutine, polling and catchup callers can still access c.Blob.Internal and
c.Header.Internal concurrently, causing a race condition. To fix this, either
implement an immutable transport snapshot pattern where the entire transport is
swapped atomically behind a synchronized indirection (like a pointer to a struct
containing Blob and Header), or ensure all RPC callers route their calls through
c.mu before the swap happens and only publish IsWebSocket=true after the swap is
protected by the same lock that guards all RPC read operations.
- Around line 23-25: The client struct needs a closed flag to track whether
Close has been called, preventing race conditions in goroutine management. Add a
boolean field (like closed or isClosed) to the struct alongside mu, closer, and
retryCancel. In the Close method, acquire c.mu, set this flag to true, then
proceed with canceling the retry loop and closing the existing transport. In the
tryUpgradeWS method, check this closed flag while holding c.mu at the beginning
and return early if the client is already closed, ensuring no new WS transports
are installed after Close has started. This prevents goroutine leaks from WS
clients that would never be closed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1b423264-26b3-4b99-819a-d51141a0dc6b

📥 Commits

Reviewing files that changed from the base of the PR and between 2b5828d and 5bbfaf2.

📒 Files selected for processing (2)
  • block/internal/da/client.go
  • pkg/da/jsonrpc/client.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • block/internal/da/client.go

@julienrbrt julienrbrt enabled auto-merge June 19, 2026 14:42
@codecov

codecov Bot commented Jun 19, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 9.47368% with 86 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.60%. Comparing base (ec9f9bf) to head (1deca93).

Files with missing lines Patch % Lines
pkg/da/jsonrpc/client.go 10.41% 41 Missing and 2 partials ⚠️
block/internal/da/subscriber.go 7.14% 38 Missing and 1 partial ⚠️
block/internal/da/client.go 33.33% 2 Missing ⚠️
block/internal/da/tracing.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3361      +/-   ##
==========================================
- Coverage   60.97%   60.60%   -0.37%     
==========================================
  Files         129      129              
  Lines       14111    14201      +90     
==========================================
+ Hits         8604     8607       +3     
- Misses       4556     4639      +83     
- Partials      951      955       +4     
Flag Coverage Δ
combined 60.60% <9.47%> (-0.37%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@julienrbrt julienrbrt added this pull request to the merge queue Jun 20, 2026
Merged via the queue into main with commit 7e6ea1e Jun 20, 2026
33 checks passed
@julienrbrt julienrbrt deleted the julien/fix-polling-fallback branch June 20, 2026 21:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants