From e4dc50b6121b8dc5cf9f120b0355625b6d9b435c Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 3 Mar 2026 18:18:01 +0100 Subject: [PATCH 1/8] add stricterlinting --- .github/workflows/lint.yml | 4 +- .golangci.yml | 62 ++++++++++++++++++- .just/lint.just | 4 +- .../internal/da/async_block_retriever_test.go | 1 + block/internal/executing/executor.go | 6 +- .../internal/submitting/da_submitter_test.go | 1 - block/internal/syncing/syncer.go | 24 ++++--- block/internal/syncing/syncer_backoff_test.go | 6 +- .../internal/syncing/syncer_benchmark_test.go | 2 +- block/internal/syncing/syncer_test.go | 4 +- node/failover.go | 6 +- node/full.go | 2 +- node/full_node_test.go | 10 ++- node/light.go | 2 +- pkg/cmd/run_node_test.go | 13 ++-- pkg/p2p/client_test.go | 1 - .../da_visualization_non_aggregator_test.go | 10 +-- pkg/rpc/server/da_visualization_test.go | 16 ++--- pkg/rpc/server/http_test.go | 4 +- pkg/rpc/server/server_test.go | 19 ++++-- pkg/store/store_adapter_test.go | 2 +- scripts/test.go | 3 +- tools/blob-decoder/main.go | 4 +- tools/local-da/rpc.go | 4 +- 24 files changed, 141 insertions(+), 69 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index f9607b7f43..506aefbd8f 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -28,8 +28,8 @@ jobs: go.sum - uses: golangci/golangci-lint-action@v9.2.0 with: - version: latest - args: --timeout 10m + version: v2.10.1 + args: --timeout 10m ./... github-token: ${{ secrets.github_token }} if: env.GIT_DIFF diff --git a/.golangci.yml b/.golangci.yml index b5185a5af5..6e23feba11 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,18 +1,63 @@ version: "2" run: modules-download-mode: readonly + timeout: 10m + tests: true build-tags: - evm - e2e - docker linters: enable: + - asciicheck + - bidichk + - bodyclose + - containedctx + - contextcheck + - copyloopvar + - durationcheck + - errname + - errcheck - errorlint + - gocritic + - govet + - ineffassign + - makezero - gosec - misspell + - nilerr + - noctx + - nolintlint + - prealloc + - predeclared + - reassign - revive + - rowserrcheck + - sqlclosecheck + - staticcheck + - testifylint - unconvert + - unparam + - unused + - usestdlibvars + - wastedassign settings: + errcheck: + check-type-assertions: true + check-blank: true + govet: + enable-all: true + disable: + - fieldalignment + - shadow + gocritic: + enabled-tags: + - diagnostic + - style + - performance + disabled-checks: + - hugeParam + - rangeValCopy gosec: excludes: - G115 @@ -21,7 +66,7 @@ linters: - name: package-comments disabled: true - name: duplicated-imports - severity: warning + severity: error - name: exported arguments: - disableStutteringCheck @@ -36,6 +81,21 @@ linters: - third_party$ - builtin$ - examples$ + disable: + - containedctx + - errcheck + - gocritic + - nolintlint + - prealloc + - predeclared + - testifylint + - thelper + - tparallel + - unparam + - wrapcheck +issues: + max-issues-per-linter: 0 + max-same-issues: 0 formatters: enable: - gci diff --git a/.just/lint.just b/.just/lint.just index 6075d8d819..ad43688df0 100644 --- a/.just/lint.just +++ b/.just/lint.just @@ -2,7 +2,7 @@ [group('lint')] lint: vet @echo "--> Running golangci-lint" - @golangci-lint run + @golangci-lint run ./... @echo "--> Running markdownlint" @markdownlint --config .markdownlint.yaml '**/*.md' @echo "--> Running hadolint" @@ -18,7 +18,7 @@ lint: vet [group('lint')] lint-fix: @echo "--> Formatting go" - @golangci-lint run --fix + @golangci-lint run --fix ./... @echo "--> Formatting markdownlint" @markdownlint --config .markdownlint.yaml --ignore './changelog.md' '**/*.md' -f diff --git a/block/internal/da/async_block_retriever_test.go b/block/internal/da/async_block_retriever_test.go index 952bfe560b..0ef3e60b48 100644 --- a/block/internal/da/async_block_retriever_test.go +++ b/block/internal/da/async_block_retriever_test.go @@ -266,5 +266,6 @@ func TestBlockData_SerializationEmpty(t *testing.T) { } assert.Equal(t, uint64(100), decoded.Height) + assert.Equal(t, time.Unix(0, 0).UTC(), decoded.Timestamp) assert.Equal(t, 0, len(decoded.Blobs)) } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 688c4c28e1..47a4e901d8 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -583,7 +583,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { LastSubmittedDaHeaderHeight: e.cache.GetLastSubmittedHeaderHeight(), LastSubmittedDaDataHeight: e.cache.GetLastSubmittedDataHeight(), } - if err := e.raftNode.Broadcast(e.ctx, raftState); err != nil { + if err := e.raftNode.Broadcast(ctx, raftState); err != nil { return fmt.Errorf("failed to propose block to raft: %w", err) } e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft") @@ -609,12 +609,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // IMPORTANT: Header MUST be broadcast before data — the P2P layer validates // incoming data against the current and previous header, so out-of-order // delivery would cause validation failures on peers. - if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{ + if err := e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{ SignedHeader: header, }); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast header") } - if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{ + if err := e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, &types.P2PData{ Data: data, }); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast data") diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index cb00b36da8..e92d085a80 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -145,7 +145,6 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { // Create test signer addr, pub, signer := createTestSigner(t) - gen.ProposerAddress = addr // Create test headers header1 := &types.SignedHeader{ diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index a408c5b7c3..5fb75f4d17 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -222,14 +222,14 @@ func (s *Syncer) Start(ctx context.Context) error { s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.cache, s.genesis, s.logger) - if currentHeight, err := s.store.Height(s.ctx); err != nil { + if currentHeight, err := s.store.Height(ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") } else { s.p2pHandler.SetProcessedHeight(currentHeight) } if s.raftRetriever != nil { - if err := s.raftRetriever.Start(s.ctx); err != nil { + if err := s.raftRetriever.Start(ctx); err != nil { return fmt.Errorf("start raft retriever: %w", err) } } @@ -242,7 +242,7 @@ func (s *Syncer) Start(ctx context.Context) error { s.wg.Go(s.processLoop) // Start dedicated workers for DA, and pending processing - s.startSyncWorkers() + s.startSyncWorkers(ctx) s.logger.Info().Msg("syncer started") return nil @@ -389,11 +389,12 @@ func (s *Syncer) processLoop() { } } -func (s *Syncer) startSyncWorkers() { +func (s *Syncer) startSyncWorkers(ctx context.Context) { + _ = ctx s.wg.Add(3) go s.daWorkerLoop() go s.pendingWorkerLoop() - go s.p2pWorkerLoop() + go s.p2pWorkerLoop(ctx) } func (s *Syncer) daWorkerLoop() { @@ -516,7 +517,7 @@ func (s *Syncer) pendingWorkerLoop() { } } -func (s *Syncer) p2pWorkerLoop() { +func (s *Syncer) p2pWorkerLoop(ctx context.Context) { defer s.wg.Done() logger := s.logger.With().Str("worker", "p2p").Logger() @@ -525,12 +526,12 @@ func (s *Syncer) p2pWorkerLoop() { for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return default: } - currentHeight, err := s.store.Height(s.ctx) + currentHeight, err := s.store.Height(ctx) if err != nil { logger.Error().Err(err).Msg("failed to get current height for P2P worker") if !s.sleepOrDone(50 * time.Millisecond) { @@ -540,7 +541,7 @@ func (s *Syncer) p2pWorkerLoop() { } targetHeight := currentHeight + 1 - waitCtx, cancel := context.WithCancel(s.ctx) + waitCtx, cancel := context.WithCancel(ctx) s.setP2PWaitState(targetHeight, cancel) err = s.p2pHandler.ProcessHeight(waitCtx, targetHeight, s.heightInCh) @@ -876,7 +877,7 @@ func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *t // Set custom verifier for aggregator node signature header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider) - if err := header.ValidateBasicWithData(data); err != nil { + if err := header.ValidateBasicWithData(data); err != nil { //nolint:contextcheck // validation API does not accept context return fmt.Errorf("invalid header: %w", err) } @@ -1308,10 +1309,7 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS s.logger.Debug().Err(err).Msg("no state in store, using genesis defaults for recovery") currentState = types.State{ ChainID: s.genesis.ChainID, - InitialHeight: s.genesis.InitialHeight, LastBlockHeight: s.genesis.InitialHeight - 1, - LastBlockTime: s.genesis.StartTime, - DAHeight: s.genesis.DAStartHeight, } } } diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index d6c6689f29..c20620c856 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -121,7 +121,7 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { } // Run sync loop - syncer.startSyncWorkers() + syncer.startSyncWorkers(context.Background()) <-ctx.Done() syncer.wg.Wait() @@ -223,7 +223,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { go syncer.processLoop() // Run workers - syncer.startSyncWorkers() + syncer.startSyncWorkers(context.Background()) <-ctx.Done() syncer.wg.Wait() @@ -294,7 +294,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { Return(nil, datypes.ErrBlobNotFound).Once() go syncer.processLoop() - syncer.startSyncWorkers() + syncer.startSyncWorkers(context.Background()) <-ctx.Done() syncer.wg.Wait() diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 330afc2e53..9cfbe811e0 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -44,7 +44,7 @@ func BenchmarkSyncerIO(b *testing.B) { // run both loops go fixt.s.processLoop() - fixt.s.startSyncWorkers() + fixt.s.startSyncWorkers(context.Background()) require.Eventually(b, func() bool { processedHeight, _ := fixt.s.store.Height(b.Context()) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5ffd607d5b..9a85bfb6e7 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -416,7 +416,7 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, datypes.ErrHeightFromFuture) go syncerInst1.processLoop() - syncerInst1.startSyncWorkers() + syncerInst1.startSyncWorkers(context.Background()) syncerInst1.wg.Wait() requireEmptyChan(t, errorCh) @@ -479,7 +479,7 @@ func TestSyncLoopPersistState(t *testing.T) { // when it starts, it should fetch from the last height it stopped at t.Log("sync workers on instance2 started") - syncerInst2.startSyncWorkers() + syncerInst2.startSyncWorkers(context.Background()) syncerInst2.wg.Wait() t.Log("sync workers exited") diff --git a/node/failover.go b/node/failover.go index cc4e3b3ef1..a7f575a833 100644 --- a/node/failover.go +++ b/node/failover.go @@ -182,7 +182,7 @@ func setupFailoverState( func (f *failoverState) Run(pCtx context.Context) (multiErr error) { stopService := func(stoppable func(context.Context) error, name string) { // parent context is cancelled already, so we need to create a new one - shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second) + shutdownCtx, done := context.WithTimeout(context.WithoutCancel(pCtx), 3*time.Second) defer done() if err := stoppable(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) { @@ -192,7 +192,7 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { cCtx, cancel := context.WithCancel(pCtx) defer cancel() wg, ctx := errgroup.WithContext(cCtx) - wg.Go(func() (rerr error) { + wg.Go(func() (rerr error) { //nolint:contextcheck // block components stop API does not accept context defer func() { if err := f.bc.Stop(); err != nil && !errors.Is(err, context.Canceled) { rerr = errors.Join(rerr, fmt.Errorf("stopping block components: %w", err)) @@ -234,7 +234,7 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { wg.Go(func() error { defer func() { - shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second) + shutdownCtx, done := context.WithTimeout(context.WithoutCancel(ctx), 3*time.Second) defer done() _ = f.rpcServer.Shutdown(shutdownCtx) }() diff --git a/node/full.go b/node/full.go index 692ce67c5b..df4258262b 100644 --- a/node/full.go +++ b/node/full.go @@ -307,7 +307,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { n.Logger.Info().Msg("halting full node and its sub services...") // Use a timeout context to ensure shutdown doesn't hang - shutdownCtx, cancel := context.WithTimeout(context.Background(), 9*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 9*time.Second) defer cancel() var shutdownMultiErr error // Variable to accumulate multiple errors diff --git a/node/full_node_test.go b/node/full_node_test.go index ec33f2156b..f6b977b6c5 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -40,7 +40,9 @@ func TestStartInstrumentationServer(t *testing.T) { time.Sleep(100 * time.Millisecond) - resp, err := http.Get(fmt.Sprintf("http://%s/metrics", config.Instrumentation.PrometheusListenAddr)) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, fmt.Sprintf("http://%s/metrics", config.Instrumentation.PrometheusListenAddr), nil) + require.NoError(err, "Failed to create Prometheus metrics request") + resp, err := http.DefaultClient.Do(req) //nolint:gosec // test-only request to local instrumentation endpoint require.NoError(err, "Failed to get Prometheus metrics") defer func() { err := resp.Body.Close() @@ -53,7 +55,9 @@ func TestStartInstrumentationServer(t *testing.T) { require.NoError(err) assert.Contains(string(body), "# HELP", "Prometheus metrics body should contain HELP lines") - resp, err = http.Get(fmt.Sprintf("http://%s/debug/pprof/", config.Instrumentation.PprofListenAddr)) + req, err = http.NewRequestWithContext(t.Context(), http.MethodGet, fmt.Sprintf("http://%s/debug/pprof/", config.Instrumentation.PprofListenAddr), nil) + require.NoError(err, "Failed to create pprof request") + resp, err = http.DefaultClient.Do(req) //nolint:gosec // test-only request to local instrumentation endpoint require.NoError(err, "Failed to get Pprof index") defer func() { err := resp.Body.Close() @@ -66,7 +70,7 @@ func TestStartInstrumentationServer(t *testing.T) { require.NoError(err) assert.Contains(string(body), "Types of profiles available", "Pprof index body should contain expected text") - shutdownCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + shutdownCtx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() if prometheusSrv != nil { diff --git a/node/light.go b/node/light.go index 7aeb9038b6..8a4dd1335a 100644 --- a/node/light.go +++ b/node/light.go @@ -134,7 +134,7 @@ func (ln *LightNode) Run(parentCtx context.Context) error { ln.Logger.Info().Msg("halting light node and its sub services...") - shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 2*time.Second) defer cancel() var multiErr error diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index cf7d9672b3..1ed1a7e189 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -648,9 +648,10 @@ func TestStartNodeErrors(t *testing.T) { } // Log level no longer needed with Nop logger - runFunc := func() { + runFunc := func(ctx context.Context) { currentTestLogger := zerolog.Nop() - err := StartNode(currentTestLogger, cmd, executor, sequencer, nodeKey, ds, nodeConfig, testGenesis, node.NodeOptions{}) + cmd.SetContext(ctx) + err := StartNode(currentTestLogger, cmd, executor, sequencer, nodeKey, ds, nodeConfig, testGenesis, node.NodeOptions{}) //nolint:contextcheck // test invokes command entrypoint directly if tc.expectedError != "" { assert.ErrorContains(t, err, tc.expectedError) } else { @@ -663,11 +664,11 @@ func TestStartNodeErrors(t *testing.T) { } if tc.expectPanic { - assert.Panics(t, runFunc) + assert.Panics(t, func() { runFunc(baseCtx) }) } else { - assert.NotPanics(t, runFunc) + assert.NotPanics(t, func() { runFunc(baseCtx) }) checkLogger := zerolog.Nop() - err := StartNode(checkLogger, cmd, executor, sequencer, nodeKey, ds, nodeConfig, testGenesis, node.NodeOptions{}) + err := StartNode(checkLogger, cmd, executor, sequencer, nodeKey, ds, nodeConfig, testGenesis, node.NodeOptions{}) //nolint:contextcheck // test invokes command entrypoint directly if tc.expectedError != "" { assert.ErrorContains(t, err, tc.expectedError) } @@ -700,7 +701,7 @@ func newRunNodeCmd( Use: "start", Aliases: []string{"node", "run"}, Short: "Run the rollkit node", - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, args []string) error { //nolint:contextcheck // cobra RunE signature is fixed return StartNode(zerolog.Nop(), cmd, executor, sequencer, nodeKey, datastore, nodeConfig, testGenesis, node.NodeOptions{}) }, } diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index 0f8f9850e7..e3ac6f1fab 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -286,7 +286,6 @@ func TestClientInfoMethods(t *testing.T) { tempDir := t.TempDir() ClientInitFiles(t, tempDir) conf := config.DefaultConfig() - conf.RootDir = tempDir mn := mocknet.New() defer mn.Close() diff --git a/pkg/rpc/server/da_visualization_non_aggregator_test.go b/pkg/rpc/server/da_visualization_non_aggregator_test.go index 2094684893..40b5bb8f6f 100644 --- a/pkg/rpc/server/da_visualization_non_aggregator_test.go +++ b/pkg/rpc/server/da_visualization_non_aggregator_test.go @@ -33,7 +33,7 @@ func TestNonAggregatorHandleDASubmissions(t *testing.T) { server := NewDAVisualizationServer(da, logger, false) // Create test request - req, err := http.NewRequest("GET", "/da/submissions", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/submissions", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -63,7 +63,7 @@ func TestNonAggregatorHandleDAStats(t *testing.T) { server := NewDAVisualizationServer(da, logger, false) // Create test request - req, err := http.NewRequest("GET", "/da/stats", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/stats", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -89,7 +89,7 @@ func TestNonAggregatorHandleDAHealth(t *testing.T) { server := NewDAVisualizationServer(da, logger, false) // Create test request - req, err := http.NewRequest("GET", "/da/health", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/health", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -115,7 +115,7 @@ func TestNonAggregatorHandleDAVisualizationHTML(t *testing.T) { // Create a non-aggregator server server := NewDAVisualizationServer(da, logger, false) - req, err := http.NewRequest("GET", "/da", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -144,7 +144,7 @@ func TestAggregatorWithNoSubmissionsHTML(t *testing.T) { // Create an aggregator server but don't add any submissions server := NewDAVisualizationServer(da, logger, true) - req, err := http.NewRequest("GET", "/da", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() diff --git a/pkg/rpc/server/da_visualization_test.go b/pkg/rpc/server/da_visualization_test.go index 9be3ba4c88..b9521d3418 100644 --- a/pkg/rpc/server/da_visualization_test.go +++ b/pkg/rpc/server/da_visualization_test.go @@ -133,7 +133,7 @@ func TestHandleDASubmissions(t *testing.T) { }, } server.RecordSubmission(result, 0.5, 1, []byte{}) - req, err := http.NewRequest("GET", "/da/submissions", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/submissions", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -163,7 +163,7 @@ func TestHandleDABlobDetailsMissingID(t *testing.T) { logger := zerolog.New(nil) server := NewDAVisualizationServer(da, logger, true) - req, err := http.NewRequest("GET", "/da/blob", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/blob", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -178,7 +178,7 @@ func TestHandleDABlobDetailsInvalidID(t *testing.T) { logger := zerolog.New(nil) server := NewDAVisualizationServer(da, logger, true) - req, err := http.NewRequest("GET", "/da/blob?id=invalid-hex", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/blob?id=invalid-hex", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -205,7 +205,7 @@ func TestHandleDAVisualizationHTML(t *testing.T) { } server.RecordSubmission(result, 0.5, 1, []byte{}) - req, err := http.NewRequest("GET", "/da", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -265,7 +265,7 @@ func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { RegisterCustomHTTPEndpoints(mux, nil, nil, config.DefaultConfig(), nil, nopLogger, nil) // Test /da endpoint - req, err := http.NewRequest("GET", "/da", nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -275,7 +275,7 @@ func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { assert.Equal(t, "text/html", rr.Header().Get("Content-Type")) // Test /da/submissions endpoint - req, err = http.NewRequest("GET", "/da/submissions", nil) + req, err = http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/submissions", nil) require.NoError(t, err) rr = httptest.NewRecorder() @@ -285,7 +285,7 @@ func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) // Test /da/blob endpoint (missing ID should return 400) - req, err = http.NewRequest("GET", "/da/blob", nil) + req, err = http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/blob", nil) require.NoError(t, err) rr = httptest.NewRecorder() @@ -306,7 +306,7 @@ func TestRegisterCustomHTTPEndpointsWithoutServer(t *testing.T) { endpoints := []string{"/da", "/da/submissions", "/da/blob"} for _, endpoint := range endpoints { - req, err := http.NewRequest("GET", endpoint, nil) + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, endpoint, nil) require.NoError(t, err) rr := httptest.NewRecorder() diff --git a/pkg/rpc/server/http_test.go b/pkg/rpc/server/http_test.go index dc241a65af..4b6975a6d3 100644 --- a/pkg/rpc/server/http_test.go +++ b/pkg/rpc/server/http_test.go @@ -29,7 +29,9 @@ func TestRegisterCustomHTTPEndpoints(t *testing.T) { testServer := httptest.NewServer(mux) defer testServer.Close() - resp, err := http.Get(testServer.URL + "/health/live") + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, testServer.URL+"/health/live", nil) + assert.NoError(t, err) + resp, err := http.DefaultClient.Do(req) //nolint:gosec // test-only request to httptest server assert.NoError(t, err) defer resp.Body.Close() diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index ddd50b550e..9538b10752 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -28,6 +28,13 @@ import ( pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) +func mustGETRequest(t *testing.T, url string) *http.Request { + t.Helper() + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, url, nil) + require.NoError(t, err) + return req +} + func TestGetBlock(t *testing.T) { // Create a mock store mockStore := mocks.NewMockStore(t) @@ -422,7 +429,7 @@ func TestHealthLiveEndpoint(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/health/live") + resp, err := http.DefaultClient.Do(mustGETRequest(t, server.URL+"/health/live")) //nolint:gosec // test-only request to httptest server require.NoError(t, err) defer resp.Body.Close() @@ -447,7 +454,7 @@ func TestHealthLiveEndpoint(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/health/live") + resp, err := http.DefaultClient.Do(mustGETRequest(t, server.URL+"/health/live")) //nolint:gosec // test-only request to httptest server require.NoError(t, err) defer resp.Body.Close() @@ -472,7 +479,7 @@ func TestHealthLiveEndpoint(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/health/live") + resp, err := http.DefaultClient.Do(mustGETRequest(t, server.URL+"/health/live")) //nolint:gosec // test-only request to httptest server require.NoError(t, err) defer resp.Body.Close() @@ -548,7 +555,7 @@ func TestHealthReadyEndpoint(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/health/ready") + resp, err := http.DefaultClient.Do(mustGETRequest(t, server.URL+"/health/ready")) //nolint:gosec // test-only request to httptest server require.NoError(t, err) defer resp.Body.Close() require.Equal(t, tc.expectedCode, resp.StatusCode) @@ -589,7 +596,7 @@ func TestHealthReadyEndpoint(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/health/ready") + resp, err := http.DefaultClient.Do(mustGETRequest(t, server.URL+"/health/ready")) //nolint:gosec // test-only request to httptest server require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusOK, resp.StatusCode) @@ -619,7 +626,7 @@ func TestHealthReadyEndpoint(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/health/ready") + resp, err := http.DefaultClient.Do(mustGETRequest(t, server.URL+"/health/ready")) //nolint:gosec // test-only request to httptest server require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) diff --git a/pkg/store/store_adapter_test.go b/pkg/store/store_adapter_test.go index c0427ddbf0..ce1420c7e3 100644 --- a/pkg/store/store_adapter_test.go +++ b/pkg/store/store_adapter_test.go @@ -568,7 +568,7 @@ func TestStoreAdapter_ConcurrentAppendAndRead(t *testing.T) { go func() { defer wg.Done() for i := range itemsPerWriter * numWriters { - _ = adapter.Height() + _ = adapter.Height() //nolint:contextcheck // external interface does not accept context _, _ = adapter.Head(ctx) _ = adapter.HasAt(ctx, uint64(i+1)) } diff --git a/scripts/test.go b/scripts/test.go index 43f921ce26..71fa9eb7d6 100644 --- a/scripts/test.go +++ b/scripts/test.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "os" @@ -29,7 +30,7 @@ func main() { // For this example, we'll run tests in all directories with go.mod. fmt.Printf("--> Running tests in: %s\n", modDir) - cmd := exec.Command("go", "test", "./...", "-cover") + cmd := exec.CommandContext(context.Background(), "go", "test", "./...", "-cover") cmd.Dir = modDir // Set the working directory for the command cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr diff --git a/tools/blob-decoder/main.go b/tools/blob-decoder/main.go index 20ad5b92c9..da2322f1df 100644 --- a/tools/blob-decoder/main.go +++ b/tools/blob-decoder/main.go @@ -81,7 +81,7 @@ func corsMiddleware(next http.Handler) http.Handler { w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") - if r.Method == "OPTIONS" { + if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) return } @@ -99,7 +99,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } func handleDecode(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { + if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } diff --git a/tools/local-da/rpc.go b/tools/local-da/rpc.go index 6f681d6538..c8a3c97bee 100644 --- a/tools/local-da/rpc.go +++ b/tools/local-da/rpc.go @@ -111,8 +111,8 @@ func (s *blobServer) GetProof(_ context.Context, _ uint64, _ libshare.Namespace, } // Included reports whether a commitment is present at a given height/namespace. -func (s *blobServer) Included(_ context.Context, height uint64, namespace libshare.Namespace, _ *jsonrpc.Proof, commitment jsonrpc.Commitment) (bool, error) { - _, err := s.Get(context.Background(), height, namespace, commitment) +func (s *blobServer) Included(ctx context.Context, height uint64, namespace libshare.Namespace, _ *jsonrpc.Proof, commitment jsonrpc.Commitment) (bool, error) { + _, err := s.Get(ctx, height, namespace, commitment) if err != nil { if errors.Is(err, datypes.ErrBlobNotFound) { return false, nil From d4063fea36fe82bb69f1313e48e8d455febf06bb Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 3 Mar 2026 18:38:18 +0100 Subject: [PATCH 2/8] revert some changes and enable prealloc and predeclared --- .golangci.yml | 7 +++++-- block/internal/submitting/da_submitter.go | 14 +++++++------- block/internal/syncing/syncer_backoff_test.go | 6 +++--- block/internal/syncing/syncer_benchmark_test.go | 2 +- block/internal/syncing/syncer_test.go | 4 ++-- node/full_node_test.go | 4 ++-- pkg/p2p/client.go | 5 +++-- .../da_visualization_non_aggregator_test.go | 10 +++++----- pkg/rpc/server/da_visualization_test.go | 16 ++++++++-------- pkg/rpc/server/http_test.go | 5 ++--- pkg/rpc/server/server_test.go | 2 +- pkg/telemetry/tracing.go | 10 +++++----- scripts/test.go | 5 ++--- 13 files changed, 46 insertions(+), 44 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 6e23feba11..b8a48952ec 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -77,6 +77,11 @@ linters: - common-false-positives - legacy - std-error-handling + rules: + - path: _test\.go + linters: + - prealloc + - noctx paths: - third_party$ - builtin$ @@ -86,8 +91,6 @@ linters: - errcheck - gocritic - nolintlint - - prealloc - - predeclared - testifylint - thelper - tparallel diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 31c2681c91..4720d7788f 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -90,15 +90,15 @@ func (rs *retryState) Next(reason retryReason, pol retryPolicy) { } // clamp constrains a duration between min and max bounds -func clamp(v, min, max time.Duration) time.Duration { - if min > max { - min, max = max, min +func clamp(v, minTime, maxTime time.Duration) time.Duration { + if minTime > maxTime { + minTime, maxTime = maxTime, minTime } - if v < min { - return min + if v < minTime { + return minTime } - if v > max { - return max + if v > maxTime { + return maxTime } return v } diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index c20620c856..f295396938 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -121,7 +121,7 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { } // Run sync loop - syncer.startSyncWorkers(context.Background()) + syncer.startSyncWorkers(t.Context()) <-ctx.Done() syncer.wg.Wait() @@ -223,7 +223,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { go syncer.processLoop() // Run workers - syncer.startSyncWorkers(context.Background()) + syncer.startSyncWorkers(t.Context()) <-ctx.Done() syncer.wg.Wait() @@ -294,7 +294,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { Return(nil, datypes.ErrBlobNotFound).Once() go syncer.processLoop() - syncer.startSyncWorkers(context.Background()) + syncer.startSyncWorkers(t.Context()) <-ctx.Done() syncer.wg.Wait() diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 9cfbe811e0..7bd0a6d1f4 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -44,7 +44,7 @@ func BenchmarkSyncerIO(b *testing.B) { // run both loops go fixt.s.processLoop() - fixt.s.startSyncWorkers(context.Background()) + fixt.s.startSyncWorkers(b.Context()) require.Eventually(b, func() bool { processedHeight, _ := fixt.s.store.Height(b.Context()) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 9a85bfb6e7..e61bc13ba6 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -416,7 +416,7 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, datypes.ErrHeightFromFuture) go syncerInst1.processLoop() - syncerInst1.startSyncWorkers(context.Background()) + syncerInst1.startSyncWorkers(t.Context()) syncerInst1.wg.Wait() requireEmptyChan(t, errorCh) @@ -479,7 +479,7 @@ func TestSyncLoopPersistState(t *testing.T) { // when it starts, it should fetch from the last height it stopped at t.Log("sync workers on instance2 started") - syncerInst2.startSyncWorkers(context.Background()) + syncerInst2.startSyncWorkers(t.Context()) syncerInst2.wg.Wait() t.Log("sync workers exited") diff --git a/node/full_node_test.go b/node/full_node_test.go index f6b977b6c5..23c2626f40 100644 --- a/node/full_node_test.go +++ b/node/full_node_test.go @@ -40,7 +40,7 @@ func TestStartInstrumentationServer(t *testing.T) { time.Sleep(100 * time.Millisecond) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, fmt.Sprintf("http://%s/metrics", config.Instrumentation.PrometheusListenAddr), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/metrics", config.Instrumentation.PrometheusListenAddr), nil) require.NoError(err, "Failed to create Prometheus metrics request") resp, err := http.DefaultClient.Do(req) //nolint:gosec // test-only request to local instrumentation endpoint require.NoError(err, "Failed to get Prometheus metrics") @@ -55,7 +55,7 @@ func TestStartInstrumentationServer(t *testing.T) { require.NoError(err) assert.Contains(string(body), "# HELP", "Prometheus metrics body should contain HELP lines") - req, err = http.NewRequestWithContext(t.Context(), http.MethodGet, fmt.Sprintf("http://%s/debug/pprof/", config.Instrumentation.PprofListenAddr), nil) + req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/debug/pprof/", config.Instrumentation.PprofListenAddr), nil) require.NoError(err, "Failed to create pprof request") resp, err = http.DefaultClient.Do(req) //nolint:gosec // test-only request to local instrumentation endpoint require.NoError(err, "Failed to get Pprof index") diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 4288f749f9..e8fc6eabe1 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -415,9 +415,10 @@ func (c *Client) GetPeers() ([]peer.AddrInfo, error) { } func (c *Client) GetNetworkInfo() (NetworkInfo, error) { - var addrs []string + hostAddrs := c.host.Addrs() + addrs := make([]string, 0, len(hostAddrs)) peerIDSuffix := "/p2p/" + c.host.ID().String() - for _, a := range c.host.Addrs() { + for _, a := range hostAddrs { addr := a.String() // Only append peer ID if not already present if !strings.HasSuffix(addr, peerIDSuffix) { diff --git a/pkg/rpc/server/da_visualization_non_aggregator_test.go b/pkg/rpc/server/da_visualization_non_aggregator_test.go index 40b5bb8f6f..0d157deb6a 100644 --- a/pkg/rpc/server/da_visualization_non_aggregator_test.go +++ b/pkg/rpc/server/da_visualization_non_aggregator_test.go @@ -33,7 +33,7 @@ func TestNonAggregatorHandleDASubmissions(t *testing.T) { server := NewDAVisualizationServer(da, logger, false) // Create test request - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/submissions", nil) + req, err := http.NewRequest(http.MethodGet, "/da/submissions", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -63,7 +63,7 @@ func TestNonAggregatorHandleDAStats(t *testing.T) { server := NewDAVisualizationServer(da, logger, false) // Create test request - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/stats", nil) + req, err := http.NewRequest(http.MethodGet, "/da/stats", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -89,7 +89,7 @@ func TestNonAggregatorHandleDAHealth(t *testing.T) { server := NewDAVisualizationServer(da, logger, false) // Create test request - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/health", nil) + req, err := http.NewRequest(http.MethodGet, "/da/health", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -115,7 +115,7 @@ func TestNonAggregatorHandleDAVisualizationHTML(t *testing.T) { // Create a non-aggregator server server := NewDAVisualizationServer(da, logger, false) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) + req, err := http.NewRequest(http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -144,7 +144,7 @@ func TestAggregatorWithNoSubmissionsHTML(t *testing.T) { // Create an aggregator server but don't add any submissions server := NewDAVisualizationServer(da, logger, true) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) + req, err := http.NewRequest(http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() diff --git a/pkg/rpc/server/da_visualization_test.go b/pkg/rpc/server/da_visualization_test.go index b9521d3418..e2bb6c0ea4 100644 --- a/pkg/rpc/server/da_visualization_test.go +++ b/pkg/rpc/server/da_visualization_test.go @@ -133,7 +133,7 @@ func TestHandleDASubmissions(t *testing.T) { }, } server.RecordSubmission(result, 0.5, 1, []byte{}) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/submissions", nil) + req, err := http.NewRequest(http.MethodGet, "/da/submissions", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -163,7 +163,7 @@ func TestHandleDABlobDetailsMissingID(t *testing.T) { logger := zerolog.New(nil) server := NewDAVisualizationServer(da, logger, true) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/blob", nil) + req, err := http.NewRequest(http.MethodGet, "/da/blob", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -178,7 +178,7 @@ func TestHandleDABlobDetailsInvalidID(t *testing.T) { logger := zerolog.New(nil) server := NewDAVisualizationServer(da, logger, true) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/blob?id=invalid-hex", nil) + req, err := http.NewRequest(http.MethodGet, "/da/blob?id=invalid-hex", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -205,7 +205,7 @@ func TestHandleDAVisualizationHTML(t *testing.T) { } server.RecordSubmission(result, 0.5, 1, []byte{}) - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) + req, err := http.NewRequest(http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -265,7 +265,7 @@ func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { RegisterCustomHTTPEndpoints(mux, nil, nil, config.DefaultConfig(), nil, nopLogger, nil) // Test /da endpoint - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "/da", nil) + req, err := http.NewRequest(http.MethodGet, "/da", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -275,7 +275,7 @@ func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { assert.Equal(t, "text/html", rr.Header().Get("Content-Type")) // Test /da/submissions endpoint - req, err = http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/submissions", nil) + req, err = http.NewRequest(http.MethodGet, "/da/submissions", nil) require.NoError(t, err) rr = httptest.NewRecorder() @@ -285,7 +285,7 @@ func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) // Test /da/blob endpoint (missing ID should return 400) - req, err = http.NewRequestWithContext(t.Context(), http.MethodGet, "/da/blob", nil) + req, err = http.NewRequest(http.MethodGet, "/da/blob", nil) require.NoError(t, err) rr = httptest.NewRecorder() @@ -306,7 +306,7 @@ func TestRegisterCustomHTTPEndpointsWithoutServer(t *testing.T) { endpoints := []string{"/da", "/da/submissions", "/da/blob"} for _, endpoint := range endpoints { - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, endpoint, nil) + req, err := http.NewRequest(http.MethodGet, endpoint, nil) require.NoError(t, err) rr := httptest.NewRecorder() diff --git a/pkg/rpc/server/http_test.go b/pkg/rpc/server/http_test.go index 4b6975a6d3..4fa2e92e17 100644 --- a/pkg/rpc/server/http_test.go +++ b/pkg/rpc/server/http_test.go @@ -29,7 +29,7 @@ func TestRegisterCustomHTTPEndpoints(t *testing.T) { testServer := httptest.NewServer(mux) defer testServer.Close() - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, testServer.URL+"/health/live", nil) + req, err := http.NewRequest(http.MethodGet, testServer.URL+"/health/live", nil) assert.NoError(t, err) resp, err := http.DefaultClient.Do(req) //nolint:gosec // test-only request to httptest server assert.NoError(t, err) @@ -46,7 +46,6 @@ func TestRegisterCustomHTTPEndpoints(t *testing.T) { } func TestHealthReady_aggregatorBlockDelay(t *testing.T) { - ctx := t.Context() logger := zerolog.Nop() type spec struct { @@ -121,7 +120,7 @@ func TestHealthReady_aggregatorBlockDelay(t *testing.T) { ts := httptest.NewServer(mux) t.Cleanup(ts.Close) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, ts.URL+"/health/ready", nil) + req, err := http.NewRequest(http.MethodGet, ts.URL+"/health/ready", nil) require.NoError(t, err) resp, err := http.DefaultClient.Do(req) //nolint:gosec // ok to use default client in tests require.NoError(t, err) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 9538b10752..73d03059be 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -30,7 +30,7 @@ import ( func mustGETRequest(t *testing.T, url string) *http.Request { t.Helper() - req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, url, nil) + req, err := http.NewRequest(http.MethodGet, url, nil) require.NoError(t, err) return req } diff --git a/pkg/telemetry/tracing.go b/pkg/telemetry/tracing.go index 06e3f04ad6..520c84f520 100644 --- a/pkg/telemetry/tracing.go +++ b/pkg/telemetry/tracing.go @@ -66,12 +66,12 @@ func InitTracing(ctx context.Context, cfg *config.InstrumentationConfig, logger return tp.Shutdown, nil } -func clamp(x, min, max float64) float64 { - if x < min { - return min +func clamp(x, minFloat, maxFloat float64) float64 { + if x < minFloat { + return minFloat } - if x > max { - return max + if x > maxFloat { + return maxFloat } return x } diff --git a/scripts/test.go b/scripts/test.go index 71fa9eb7d6..29f35e6811 100644 --- a/scripts/test.go +++ b/scripts/test.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "log" "os" @@ -30,8 +29,8 @@ func main() { // For this example, we'll run tests in all directories with go.mod. fmt.Printf("--> Running tests in: %s\n", modDir) - cmd := exec.CommandContext(context.Background(), "go", "test", "./...", "-cover") - cmd.Dir = modDir // Set the working directory for the command + cmd := exec.Command("go", "test", "./...", "-cover") //nolint:noctx // intentionally runs module-local test command + cmd.Dir = modDir // Set the working directory for the command cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr From 44005947bbb7da4ca3dfafef3aaa0bc7a2bcf1c0 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 3 Mar 2026 19:40:13 +0100 Subject: [PATCH 3/8] ci --- .github/workflows/lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 506aefbd8f..216e62fd4a 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -28,7 +28,7 @@ jobs: go.sum - uses: golangci/golangci-lint-action@v9.2.0 with: - version: v2.10.1 + version: latest args: --timeout 10m ./... github-token: ${{ secrets.github_token }} if: env.GIT_DIFF From ed63711a9abaab863c3da876a85c36ef8c1e02df Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 3 Mar 2026 22:33:09 +0100 Subject: [PATCH 4/8] fix lint --- block/internal/executing/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index cf9e5f1f1d..cc40adade2 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -637,7 +637,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // For based sequencer, advance safe/finalized since it comes from DA. if e.config.Node.BasedSequencer { - if err := e.exec.SetFinal(e.ctx, newHeight); err != nil { + if err := e.exec.SetFinal(ctx, newHeight); err != nil { e.sendCriticalError(fmt.Errorf("failed to set final height in based sequencer mode: %w", err)) return fmt.Errorf("failed to set final height in based sequencer mode: %w", err) } From b4bd5ea5e74ec8bb9c21b978b01d51d658c86bad Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 4 Mar 2026 09:45:28 +0100 Subject: [PATCH 5/8] fix comments --- .golangci.yml | 6 ------ block/internal/syncing/syncer_backoff_test.go | 6 +++--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index b8a48952ec..4df3a98fd6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -12,14 +12,11 @@ linters: - asciicheck - bidichk - bodyclose - - containedctx - contextcheck - copyloopvar - durationcheck - errname - - errcheck - errorlint - - gocritic - govet - ineffassign - makezero @@ -27,7 +24,6 @@ linters: - misspell - nilerr - noctx - - nolintlint - prealloc - predeclared - reassign @@ -35,9 +31,7 @@ linters: - rowserrcheck - sqlclosecheck - staticcheck - - testifylint - unconvert - - unparam - unused - usestdlibvars - wastedassign diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index f295396938..a5eb17cf44 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -121,7 +121,7 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { } // Run sync loop - syncer.startSyncWorkers(t.Context()) + syncer.startSyncWorkers(ctx) <-ctx.Done() syncer.wg.Wait() @@ -223,7 +223,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { go syncer.processLoop() // Run workers - syncer.startSyncWorkers(t.Context()) + syncer.startSyncWorkers(ctx) <-ctx.Done() syncer.wg.Wait() @@ -294,7 +294,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { Return(nil, datypes.ErrBlobNotFound).Once() go syncer.processLoop() - syncer.startSyncWorkers(t.Context()) + syncer.startSyncWorkers(ctx) <-ctx.Done() syncer.wg.Wait() From fb25598f24cb9e6d541636b8542aec61705fec12 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 4 Mar 2026 11:43:33 +0100 Subject: [PATCH 6/8] fix --- block/internal/executing/executor.go | 1 - block/internal/syncing/syncer.go | 64 ++-- block/internal/syncing/syncer_backoff_test.go | 4 +- .../internal/syncing/syncer_benchmark_test.go | 4 +- block/internal/syncing/syncer_test.go | 8 +- node/failover.go | 8 +- node/full.go | 6 +- node/light.go | 6 +- spec.md | 362 ++++++++++++++++++ 9 files changed, 412 insertions(+), 51 deletions(-) create mode 100644 spec.md diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index cc40adade2..c139210066 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -381,7 +381,6 @@ func (e *Executor) executionLoop() { } else { delay = time.Until(currentState.LastBlockTime.Add(e.config.Node.BlockTime.Duration)) } - if delay > 0 { e.logger.Info().Dur("delay", delay).Msg("waiting to start block production") select { diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 04a6cf21ca..848ae9825a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -164,7 +164,8 @@ func (s *Syncer) SetBlockSyncer(bs BlockSyncer) { // Start begins the syncing component func (s *Syncer) Start(ctx context.Context) error { - s.ctx, s.cancel = context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) + s.ctx, s.cancel = ctx, cancel if err := s.initializeState(); err != nil { return fmt.Errorf("failed to initialize syncer state: %w", err) @@ -195,7 +196,7 @@ func (s *Syncer) Start(ctx context.Context) error { } // Start main processing loop - s.wg.Go(s.processLoop) + s.wg.Go(func() { s.processLoop(ctx) }) // Start dedicated workers for DA, and pending processing s.startSyncWorkers(ctx) @@ -342,38 +343,37 @@ func (s *Syncer) initializeState() error { } // processLoop is the main coordination loop for processing events -func (s *Syncer) processLoop() { +func (s *Syncer) processLoop(ctx context.Context) { s.logger.Info().Msg("starting process loop") defer s.logger.Info().Msg("process loop stopped") for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return case heightEvent, ok := <-s.heightInCh: if ok { - s.processHeightEvent(s.ctx, &heightEvent) + s.processHeightEvent(ctx, &heightEvent) } } } } func (s *Syncer) startSyncWorkers(ctx context.Context) { - _ = ctx s.wg.Add(3) - go s.daWorkerLoop() - go s.pendingWorkerLoop() + go s.daWorkerLoop(ctx) + go s.pendingWorkerLoop(ctx) go s.p2pWorkerLoop(ctx) } -func (s *Syncer) daWorkerLoop() { +func (s *Syncer) daWorkerLoop(ctx context.Context) { defer s.wg.Done() s.logger.Info().Msg("starting DA worker") defer s.logger.Info().Msg("DA worker stopped") for { - err := s.fetchDAUntilCaughtUp() + err := s.fetchDAUntilCaughtUp(ctx) var backoff time.Duration if err == nil { @@ -389,7 +389,7 @@ func (s *Syncer) daWorkerLoop() { } select { - case <-s.ctx.Done(): + case <-ctx.Done(): return case <-time.After(backoff): } @@ -402,11 +402,11 @@ func (s *Syncer) HasReachedDAHead() bool { return s.daHeadReached.Load() } -func (s *Syncer) fetchDAUntilCaughtUp() error { +func (s *Syncer) fetchDAUntilCaughtUp(ctx context.Context) error { for { select { - case <-s.ctx.Done(): - return s.ctx.Err() + case <-ctx.Done(): + return ctx.Err() default: } @@ -424,7 +424,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { daHeight = max(s.daRetrieverHeight.Load(), s.cache.DaHeight()) } - events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight) + events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight) if err != nil { switch { case errors.Is(err, datypes.ErrBlobNotFound): @@ -446,7 +446,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { // Process DA events for _, event := range events { - if err := s.pipeEvent(s.ctx, event); err != nil { + if err := s.pipeEvent(ctx, event); err != nil { return err } } @@ -467,7 +467,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { } } -func (s *Syncer) pendingWorkerLoop() { +func (s *Syncer) pendingWorkerLoop(ctx context.Context) { defer s.wg.Done() s.logger.Info().Msg("starting pending worker") @@ -478,10 +478,10 @@ func (s *Syncer) pendingWorkerLoop() { for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return case <-ticker.C: - s.processPendingEvents() + s.processPendingEvents(ctx) } } } @@ -503,7 +503,7 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { currentHeight, err := s.store.Height(ctx) if err != nil { logger.Error().Err(err).Msg("failed to get current height for P2P worker") - if !s.sleepOrDone(50 * time.Millisecond) { + if !s.sleepOrDone(ctx, 50*time.Millisecond) { return } continue @@ -525,13 +525,13 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height") } - if !s.sleepOrDone(50 * time.Millisecond) { + if !s.sleepOrDone(ctx, 50*time.Millisecond) { return } continue } - if err := s.waitForStoreHeight(targetHeight); err != nil { + if err := s.waitForStoreHeight(ctx, targetHeight); err != nil { if errors.Is(err, context.Canceled) { return } @@ -1078,8 +1078,8 @@ func (s *Syncer) sendCriticalError(err error) { // processPendingEvents fetches and processes pending events from cache // optimistically fetches the next events from cache until no matching heights are found -func (s *Syncer) processPendingEvents() { - currentHeight, err := s.store.Height(s.ctx) +func (s *Syncer) processPendingEvents(ctx context.Context) { + currentHeight, err := s.store.Height(ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get current height for pending events") return @@ -1104,7 +1104,7 @@ func (s *Syncer) processPendingEvents() { case s.heightInCh <- heightEvent: // Event was successfully sent and already removed by GetNextPendingEvent s.logger.Debug().Uint64("height", nextHeight).Msg("sent pending event to processing") - case <-s.ctx.Done(): + case <-ctx.Done(): s.cache.SetPendingEvent(nextHeight, event) return default: @@ -1116,9 +1116,9 @@ func (s *Syncer) processPendingEvents() { } } -func (s *Syncer) waitForStoreHeight(target uint64) error { +func (s *Syncer) waitForStoreHeight(ctx context.Context, target uint64) error { for { - currentHeight, err := s.store.Height(s.ctx) + currentHeight, err := s.store.Height(ctx) if err != nil { return err } @@ -1127,20 +1127,20 @@ func (s *Syncer) waitForStoreHeight(target uint64) error { return nil } - if !s.sleepOrDone(10 * time.Millisecond) { - if s.ctx.Err() != nil { - return s.ctx.Err() + if !s.sleepOrDone(ctx, 10*time.Millisecond) { + if ctx.Err() != nil { + return ctx.Err() } } } } -func (s *Syncer) sleepOrDone(duration time.Duration) bool { +func (s *Syncer) sleepOrDone(ctx context.Context, duration time.Duration) bool { timer := time.NewTimer(duration) defer timer.Stop() select { - case <-s.ctx.Done(): + case <-ctx.Done(): return false case <-timer.C: return true diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index a5eb17cf44..c2bed9385d 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -220,7 +220,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { Return(nil, datypes.ErrBlobNotFound).Once() // Start process loop to handle events - go syncer.processLoop() + go syncer.processLoop(ctx) // Run workers syncer.startSyncWorkers(ctx) @@ -293,7 +293,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { }). Return(nil, datypes.ErrBlobNotFound).Once() - go syncer.processLoop() + go syncer.processLoop(ctx) syncer.startSyncWorkers(ctx) <-ctx.Done() syncer.wg.Wait() diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 7bd0a6d1f4..6ca482c05b 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -43,8 +43,8 @@ func BenchmarkSyncerIO(b *testing.B) { fixt := newBenchFixture(b, spec.heights, spec.shuffledTx, spec.daDelay, spec.execDelay, true) // run both loops - go fixt.s.processLoop() - fixt.s.startSyncWorkers(b.Context()) + go fixt.s.processLoop(fixt.s.ctx) + fixt.s.startSyncWorkers(fixt.s.ctx) require.Eventually(b, func() bool { processedHeight, _ := fixt.s.store.Height(b.Context()) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index ee831c0010..cb15c108e6 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -309,7 +309,7 @@ func TestSyncer_processPendingEvents(t *testing.T) { cm.SetPendingEvent(1, evt1) cm.SetPendingEvent(2, evt2) - s.processPendingEvents() + s.processPendingEvents(s.ctx) // should have forwarded height 2 and removed both select { @@ -416,8 +416,8 @@ func TestSyncLoopPersistState(t *testing.T) { }). Return(nil, datypes.ErrHeightFromFuture) - go syncerInst1.processLoop() - syncerInst1.startSyncWorkers(t.Context()) + go syncerInst1.processLoop(ctx) + syncerInst1.startSyncWorkers(ctx) syncerInst1.wg.Wait() requireEmptyChan(t, errorCh) @@ -480,7 +480,7 @@ func TestSyncLoopPersistState(t *testing.T) { // when it starts, it should fetch from the last height it stopped at t.Log("sync workers on instance2 started") - syncerInst2.startSyncWorkers(t.Context()) + syncerInst2.startSyncWorkers(ctx) syncerInst2.wg.Wait() t.Log("sync workers exited") diff --git a/node/failover.go b/node/failover.go index a7f575a833..f6e0c8d9fb 100644 --- a/node/failover.go +++ b/node/failover.go @@ -180,9 +180,9 @@ func setupFailoverState( } func (f *failoverState) Run(pCtx context.Context) (multiErr error) { - stopService := func(stoppable func(context.Context) error, name string) { + stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally // parent context is cancelled already, so we need to create a new one - shutdownCtx, done := context.WithTimeout(context.WithoutCancel(pCtx), 3*time.Second) + shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second) //nolint:contextcheck // intentional: need fresh context for graceful shutdown after cancellation defer done() if err := stoppable(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) { @@ -233,8 +233,8 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { } wg.Go(func() error { - defer func() { - shutdownCtx, done := context.WithTimeout(context.WithoutCancel(ctx), 3*time.Second) + defer func() { //nolint:contextcheck // shutdown uses context.Background intentionally + shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second) defer done() _ = f.rpcServer.Shutdown(shutdownCtx) }() diff --git a/node/full.go b/node/full.go index df4258262b..1ae9a62962 100644 --- a/node/full.go +++ b/node/full.go @@ -307,7 +307,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { n.Logger.Info().Msg("halting full node and its sub services...") // Use a timeout context to ensure shutdown doesn't hang - shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 9*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 9*time.Second) //nolint:contextcheck // intentional: need fresh context for graceful shutdown after cancellation defer cancel() var shutdownMultiErr error // Variable to accumulate multiple errors @@ -323,7 +323,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Shutdown Prometheus Server if n.prometheusSrv != nil { - err := n.prometheusSrv.Shutdown(shutdownCtx) + err := n.prometheusSrv.Shutdown(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background // http.ErrServerClosed is expected on graceful shutdown if err != nil && !errors.Is(err, http.ErrServerClosed) { shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down Prometheus server: %w", err)) @@ -334,7 +334,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Shutdown Pprof Server if n.pprofSrv != nil { - err := n.pprofSrv.Shutdown(shutdownCtx) + err := n.pprofSrv.Shutdown(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background if err != nil && !errors.Is(err, http.ErrServerClosed) { shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down pprof server: %w", err)) } else { diff --git a/node/light.go b/node/light.go index 8a4dd1335a..dd1c3d61d1 100644 --- a/node/light.go +++ b/node/light.go @@ -134,13 +134,13 @@ func (ln *LightNode) Run(parentCtx context.Context) error { ln.Logger.Info().Msg("halting light node and its sub services...") - shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 2*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) //nolint:contextcheck // intentional: need fresh context for graceful shutdown after cancellation defer cancel() var multiErr error // Stop Header Sync Service - err = ln.hSyncService.Stop(shutdownCtx) + err = ln.hSyncService.Stop(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { multiErr = errors.Join(multiErr, fmt.Errorf("stopping header sync service: %w", err)) @@ -151,7 +151,7 @@ func (ln *LightNode) Run(parentCtx context.Context) error { // Shutdown RPC Server if ln.rpcServer != nil { - err = ln.rpcServer.Shutdown(shutdownCtx) + err = ln.rpcServer.Shutdown(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background if err != nil && !errors.Is(err, http.ErrServerClosed) { multiErr = errors.Join(multiErr, fmt.Errorf("shutting down RPC server: %w", err)) } else { diff --git a/spec.md b/spec.md new file mode 100644 index 0000000000..4bcf26de64 --- /dev/null +++ b/spec.md @@ -0,0 +1,362 @@ +# EV-Reth Txpool Push Subscription for EV-Node + +## Summary + +Introduce a push-based txpool subscription in ev-reth that streams ordered, +mempool-selected transactions to ev-node, removing the 1s polling bottleneck. +The stream emits snapshots of the ordered tx list using the same selection +logic as `txpoolExt_getTxs`, with backpressure and a heartbeat to guarantee +<= 1s worst-case lag. + +## Goals + +- Deliver transactions to ev-node as soon as they appear in ev-reth. +- Preserve mempool ordering identical to `txpoolExt_getTxs`. +- Keep worst-case lag at or below 1s (current behavior). +- Avoid DoS amplification by applying backpressure and bounded work. +- Maintain compatibility with existing `txpoolExt_getTxs`. + +## Non-Goals + +- Guarantee lossless delivery across disconnects (recover via resync). +- Replace sequencing logic or batch queue behavior in ev-node. +- Change transaction ordering semantics or mempool policies. + +## Current Behavior (Problem) + +- ev-node reaper polls `Executor.GetTxs` on a fixed interval. +- EVM executor uses `txpoolExt_getTxs` (HTTP) to pull an ordered list. +- With 100ms blocks, a 1s poll means txs are only observed once per second. +- Lowering the interval increases RPC load and DoS surface. + +## Proposed Architecture + +### High-Level + +- Add a new JSON-RPC subscription method in ev-reth: + `txpoolExt_subscribeBestTxs` +- ev-node connects via WebSocket and subscribes. +- Each update is a snapshot of the best-ordered txs, using the same selection + logic as `txpoolExt_getTxs`. +- ev-node uses the snapshot stream as the primary source of txs, with polling + fallback when the stream is unavailable. + +### Why Snapshot Streaming (Not "New Tx" Events) + +Ordering must match the mempool ordering (`best_transactions()`). +Streaming "new txs" in arrival order does not preserve that ordering, and +would force ev-node to re-implement mempool ordering. Snapshot updates avoid +semantic drift and keep ordering consistent with `txpoolExt_getTxs`. + +## API Specification + +### Method + +`txpoolExt_subscribeBestTxs` (WebSocket-only subscription) + +### Params (optional object) + +```json +{ + "max_bytes": 1939865, + "max_gas": 30000000, + "min_emit_interval_ms": 50, + "max_emit_interval_ms": 1000, + "include_metadata": true +} +``` + +| Parameter | Default | Description | +|------------------------|-------------------------|----------------------------------------------------| +| `max_bytes` | ev-reth config value | Per-update byte cap | +| `max_gas` | current block gas limit | Per-update gas cap | +| `min_emit_interval_ms` | 50 | Debounce window for rapid pool updates | +| `max_emit_interval_ms` | 1000 | Heartbeat max to guarantee <= 1s lag | +| `include_metadata` | true | Adds size/gas counters and monotonic `snapshot_id` | + +Parameters are fixed for the subscription lifetime. To change parameters, +disconnect and reconnect with new values. + +### Payload (when `include_metadata = true`) + +```json +{ + "snapshot_id": 42, + "timestamp_ms": 1705123456789, + "total_bytes": 102400, + "total_gas": 15000000, + "txs": ["0x...", "0x...", "..."] +} +``` + +| Field | Type | Description | +|----------------|-------|---------------------------------------------------------------| +| `snapshot_id` | u64 | Monotonic counter, resets on ev-reth restart | +| `timestamp_ms` | u64 | Wall clock time (ev-reth system time) when snapshot was built | +| `total_bytes` | u64 | Sum of tx bytes in this snapshot | +| `total_gas` | u64 | Sum of tx gas limits in this snapshot | +| `txs` | array | RLP-encoded transaction blobs in mempool priority order | + +### Ordering Guarantees + +- `txs` order equals the ordering returned by `txpoolExt_getTxs` at the time + the snapshot is generated. +- A snapshot represents the best ordered selection from the pool at that moment. +- **Ordering is locked on first sight**: once ev-node submits a tx to its batch + queue, the position is fixed regardless of subsequent mempool reorderings. + +## ev-reth Implementation Notes + +### Snapshot Generation + +- Use `TransactionPool::new_pending_pool_transactions_listener()` as the + trigger source for updates. +- On event, generate a snapshot by running the same selection logic currently + used in `txpoolExt_getTxs` (best transactions, max bytes, max gas). +- Iterate `best_transactions()` until `max_bytes` or `max_gas` limits are hit. + No hard iteration cap; rely on size/gas limits for bounding. +- Transactions exceeding the snapshot limits are silently excluded. They may + be picked up by polling fallback or future snapshots. + +### Debouncing + +- Use time-only debouncing: emit after `min_emit_interval_ms` regardless of + whether updates continue arriving. +- Coalesce multiple pool updates within the debounce window into a single + snapshot to cap CPU and bandwidth. + +### Empty Pool Handling + +- Skip emission when the transaction pool is empty. +- Heartbeat still fires at `max_emit_interval_ms` and emits an empty snapshot + (`txs: []`) when the pool is empty to prove liveness. + +### Backpressure + +- Keep at most one pending snapshot per subscriber. +- If sink is slow, drop intermediate updates and send the latest snapshot. +- If sink is stalled beyond **5 seconds**, close the subscription immediately + (abrupt close, no graceful close frame). + +### Heartbeat + +- If no updates occur within `max_emit_interval_ms`, emit the latest snapshot + (or empty snapshot if pool is empty) to guarantee <= 1s lag. + +### Subscriber Limits + +- Enforce an explicit maximum number of concurrent subscribers. +- Default: **5 subscribers**. +- Configurable via ev-reth config. +- Reject new subscriptions with an error when limit is reached. + +### Sync Requirement + +- Reject subscription requests with an error if ev-reth is still syncing to + chain tip. +- Subscription is only available when ev-reth is fully synced. + +### Resource Safety + +- No unbounded queues per subscriber. +- Snapshots are bounded by `max_bytes` and `max_gas`. +- Per-subscriber state is O(1) (snapshot_id, last_emit, dirty flag). + +## ev-node Integration + +### Config + +| Config Key | Type | Default | Description | +|--------------------------|--------|--------------------|----------------------------------------------------| +| `evm.ws-url` | string | none | WebSocket URL for ev-reth (required for streaming) | +| `evm.txpool_subscribe` | bool | true if ws-url set | Enable txpool subscription | +| `evm.txpool_buffer_size` | int | 3 | Local snapshot buffer size (3-5 recommended) | + +Single `ws-url` only; high availability is handled at infrastructure level +(load balancer, DNS failover). + +### Reaper Behavior + +- If streaming is available, consume snapshots and submit new txs. +- If stream is unavailable: + - Fall back to current polling behavior (`GetTxs`). + - Retry stream connection with exponential backoff. + +### Connection Lifecycle + +1. On startup, attempt WebSocket connection to `evm.ws-url`. +2. Subscribe to `txpoolExt_subscribeBestTxs` with configured params. +3. If subscription is rejected (ev-reth syncing, limit reached), fall back to + polling and retry after backoff. +4. On disconnect, switch to polling immediately and begin reconnection attempts. + +### Reconnection Backoff + +- **Base interval**: 100ms +- **Multiplier**: 2x on each failure +- **Cap**: 30 seconds +- **Retry**: Unbounded (retry forever while polling provides fallback) + +### Snapshot ID Regression Handling + +When ev-node detects `snapshot_id` regression (indicating ev-reth restart or +failover): + +1. Clear the local snapshot buffer (discard all buffered snapshots). +2. Clear the seen-tx cache. +3. Continue processing from the new snapshot. + +### Local Buffer + +- Maintain a bounded buffer of **3-5 snapshots** (configurable). +- Always consume snapshots from WebSocket into buffer. +- If buffer is full, drop oldest snapshot. +- Log at WARN level and increment metric when dropping snapshots. + +### Fallback Transition + +- On stream disconnect: immediately switch to polling. +- On stream reconnect: immediately switch to streaming (trust dedup for overlap). +- No overlap window or drain period required. + +### Deduplication + +- Maintain seen-tx cache (hash-based). +- For each snapshot, filter txs already seen and submit only new txs. +- **Eviction policy**: Finality-driven. Clear entries when the batch containing + them is confirmed on DA layer. +- On `snapshot_id` regression, clear the entire seen-tx cache. + +### Validation + +- Trust ev-reth implicitly; no RLP validation on receipt. +- Malformed data will fail at decode time during actual use. + +### Logging + +- Log each received snapshot at DEBUG level: + - `snapshot_id` + - `tx_count` + - `receipt_lag_ms` +- Log stream connect/disconnect at INFO level. +- Log snapshot drops at WARN level. + +### Semantics + +- Ordered txs are provided by ev-reth; ev-node does not reorder. +- Snapshot processing is idempotent due to seen-tx cache. + +## Failure Modes and Recovery + +| Failure | ev-reth Behavior | ev-node Behavior | +|-------------------|-----------------------------------------------------------------------|-----------------------------------------------------------------------| +| Stream disconnect | N/A | Switch to polling, begin reconnect backoff | +| ev-reth overload | Snapshot rate reduced via debounce; slow subscribers dropped after 5s | Polling fallback handles availability | +| Missed updates | N/A | Next snapshot is a full best-txs view; ev-node recovers automatically | +| ev-reth restart | snapshot_id resets to 0 | Detect regression, clear cache and buffer, continue | +| ev-reth syncing | Reject subscription | Fall back to polling, retry subscription | + +## Security and DoS Considerations + +- Subscription is WS-only; exposure should follow existing RPC security policy. +- Authentication: network-level only (firewall, VPC). No application-level auth. +- Rate limits enforced via `min_emit_interval_ms` and backpressure. +- Subscriber cap prevents resource exhaustion. +- No unbounded memory growth. + +## Observability + +### ev-reth Metrics + +| Metric | Type | Description | +|----------------------------------|-----------|---------------------------------------| +| `txpoolext_subscribers` | gauge | Current number of active subscribers | +| `txpoolext_snapshot_emits_total` | counter | Total snapshots emitted | +| `txpoolext_snapshot_drop_total` | counter | Snapshots dropped due to backpressure | +| `txpoolext_snapshot_build_ms` | histogram | Time to build a snapshot | + +### ev-node Metrics + +| Metric | Type | Description | +|--------------------------------------|-----------|--------------------------------------------| +| `reaper_stream_connected` | gauge | 1 if stream connected, 0 otherwise | +| `reaper_stream_reconnects_total` | counter | Total reconnection attempts | +| `reaper_stream_snapshot_lag_ms` | histogram | Receipt lag (timestamp_ms vs receive time) | +| `reaper_stream_fallback_polls_total` | counter | Polls made while in fallback mode | +| `reaper_stream_buffer_drops_total` | counter | Snapshots dropped due to full buffer | + +## Compatibility + +- `txpoolExt_getTxs` remains unchanged. +- HTTP-only clients unaffected. +- WS subscription is additive and optional. +- No protocol versioning in initial release; add when breaking changes are needed. + +## Testing Plan + +### Unit Tests (ev-reth) + +- Snapshot ordering matches `getTxs`. +- `max_bytes`/`max_gas` enforced. +- Heartbeat interval honored (emits at max_emit_interval_ms). +- Heartbeat emits empty snapshot when pool is empty. +- Backpressure drops intermediate snapshots. +- Subscriber cap enforced. +- Subscription rejected when syncing. + +### Unit Tests (ev-node) + +- Snapshot buffer bounded correctly. +- Oldest snapshot dropped when buffer full. +- seen-tx cache cleared on snapshot_id regression. +- Buffer cleared on snapshot_id regression. +- Reconnect backoff schedule correct (100ms base, 2x, 30s cap). +- Immediate fallback on disconnect. +- Immediate switch on reconnect. + +### Integration Tests + +- ev-node receives updates <= 1s. +- Stream disconnect triggers polling fallback. +- Ordering preserved end-to-end. +- seen-tx cache eviction on DA finality. +- Recovery from ev-reth restart (snapshot_id regression). + +## Rollout Plan + +1. Add ev-reth subscription API and metrics. +2. Add ev-node WS subscription client with fallback polling. +3. Enable subscription by default when `evm.ws-url` is set. +4. Keep existing polling configuration for opt-out and as fallback. + +## Summary of Key Decisions + +| Decision | Choice | +|-----------------------|--------------------------------------| +| Ordering semantics | Locked on first sight | +| snapshot_id lifecycle | Ephemeral, resets on restart | +| Cache eviction | Finality-driven | +| Fallback transition | Immediate switch, trust dedup | +| Stall timeout | 5s, abrupt close | +| Debounce strategy | Time-only | +| Payload format | Blobs only, ev-node decodes | +| Oversized tx handling | Silent exclusion | +| Authentication | Network-level only | +| Client buffering | 3-5 snapshots, drop oldest | +| Multi-reth support | Single ws-url, infra-level HA | +| Empty pool emission | Skip (except heartbeat) | +| Heartbeat on empty | Emits empty snapshot | +| Reconnect backoff | 100ms base, 30s cap, unbounded retry | +| Timestamp source | Wall clock | +| Drop visibility | Log + metric | +| Validation | Trust ev-reth implicitly | +| Lag measurement | Receipt lag only | +| Param updates | Reconnect required | +| Pre-sync behavior | Reject subscription | +| Snapshot logging | DEBUG level | +| Snapshot build cap | Scan until limits hit | +| Subscriber limit | Explicit cap, default 5 | +| Regression handling | Clear cache and buffer | +| Default intervals | 50ms / 1000ms | +| Health exposure | Metrics only | +| Protocol versioning | None initially | From 917e3c92e98273fd2dc3a8493831f5dc607468bd Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 4 Mar 2026 11:46:16 +0100 Subject: [PATCH 7/8] remove spec --- spec.md | 362 -------------------------------------------------------- 1 file changed, 362 deletions(-) delete mode 100644 spec.md diff --git a/spec.md b/spec.md deleted file mode 100644 index 4bcf26de64..0000000000 --- a/spec.md +++ /dev/null @@ -1,362 +0,0 @@ -# EV-Reth Txpool Push Subscription for EV-Node - -## Summary - -Introduce a push-based txpool subscription in ev-reth that streams ordered, -mempool-selected transactions to ev-node, removing the 1s polling bottleneck. -The stream emits snapshots of the ordered tx list using the same selection -logic as `txpoolExt_getTxs`, with backpressure and a heartbeat to guarantee -<= 1s worst-case lag. - -## Goals - -- Deliver transactions to ev-node as soon as they appear in ev-reth. -- Preserve mempool ordering identical to `txpoolExt_getTxs`. -- Keep worst-case lag at or below 1s (current behavior). -- Avoid DoS amplification by applying backpressure and bounded work. -- Maintain compatibility with existing `txpoolExt_getTxs`. - -## Non-Goals - -- Guarantee lossless delivery across disconnects (recover via resync). -- Replace sequencing logic or batch queue behavior in ev-node. -- Change transaction ordering semantics or mempool policies. - -## Current Behavior (Problem) - -- ev-node reaper polls `Executor.GetTxs` on a fixed interval. -- EVM executor uses `txpoolExt_getTxs` (HTTP) to pull an ordered list. -- With 100ms blocks, a 1s poll means txs are only observed once per second. -- Lowering the interval increases RPC load and DoS surface. - -## Proposed Architecture - -### High-Level - -- Add a new JSON-RPC subscription method in ev-reth: - `txpoolExt_subscribeBestTxs` -- ev-node connects via WebSocket and subscribes. -- Each update is a snapshot of the best-ordered txs, using the same selection - logic as `txpoolExt_getTxs`. -- ev-node uses the snapshot stream as the primary source of txs, with polling - fallback when the stream is unavailable. - -### Why Snapshot Streaming (Not "New Tx" Events) - -Ordering must match the mempool ordering (`best_transactions()`). -Streaming "new txs" in arrival order does not preserve that ordering, and -would force ev-node to re-implement mempool ordering. Snapshot updates avoid -semantic drift and keep ordering consistent with `txpoolExt_getTxs`. - -## API Specification - -### Method - -`txpoolExt_subscribeBestTxs` (WebSocket-only subscription) - -### Params (optional object) - -```json -{ - "max_bytes": 1939865, - "max_gas": 30000000, - "min_emit_interval_ms": 50, - "max_emit_interval_ms": 1000, - "include_metadata": true -} -``` - -| Parameter | Default | Description | -|------------------------|-------------------------|----------------------------------------------------| -| `max_bytes` | ev-reth config value | Per-update byte cap | -| `max_gas` | current block gas limit | Per-update gas cap | -| `min_emit_interval_ms` | 50 | Debounce window for rapid pool updates | -| `max_emit_interval_ms` | 1000 | Heartbeat max to guarantee <= 1s lag | -| `include_metadata` | true | Adds size/gas counters and monotonic `snapshot_id` | - -Parameters are fixed for the subscription lifetime. To change parameters, -disconnect and reconnect with new values. - -### Payload (when `include_metadata = true`) - -```json -{ - "snapshot_id": 42, - "timestamp_ms": 1705123456789, - "total_bytes": 102400, - "total_gas": 15000000, - "txs": ["0x...", "0x...", "..."] -} -``` - -| Field | Type | Description | -|----------------|-------|---------------------------------------------------------------| -| `snapshot_id` | u64 | Monotonic counter, resets on ev-reth restart | -| `timestamp_ms` | u64 | Wall clock time (ev-reth system time) when snapshot was built | -| `total_bytes` | u64 | Sum of tx bytes in this snapshot | -| `total_gas` | u64 | Sum of tx gas limits in this snapshot | -| `txs` | array | RLP-encoded transaction blobs in mempool priority order | - -### Ordering Guarantees - -- `txs` order equals the ordering returned by `txpoolExt_getTxs` at the time - the snapshot is generated. -- A snapshot represents the best ordered selection from the pool at that moment. -- **Ordering is locked on first sight**: once ev-node submits a tx to its batch - queue, the position is fixed regardless of subsequent mempool reorderings. - -## ev-reth Implementation Notes - -### Snapshot Generation - -- Use `TransactionPool::new_pending_pool_transactions_listener()` as the - trigger source for updates. -- On event, generate a snapshot by running the same selection logic currently - used in `txpoolExt_getTxs` (best transactions, max bytes, max gas). -- Iterate `best_transactions()` until `max_bytes` or `max_gas` limits are hit. - No hard iteration cap; rely on size/gas limits for bounding. -- Transactions exceeding the snapshot limits are silently excluded. They may - be picked up by polling fallback or future snapshots. - -### Debouncing - -- Use time-only debouncing: emit after `min_emit_interval_ms` regardless of - whether updates continue arriving. -- Coalesce multiple pool updates within the debounce window into a single - snapshot to cap CPU and bandwidth. - -### Empty Pool Handling - -- Skip emission when the transaction pool is empty. -- Heartbeat still fires at `max_emit_interval_ms` and emits an empty snapshot - (`txs: []`) when the pool is empty to prove liveness. - -### Backpressure - -- Keep at most one pending snapshot per subscriber. -- If sink is slow, drop intermediate updates and send the latest snapshot. -- If sink is stalled beyond **5 seconds**, close the subscription immediately - (abrupt close, no graceful close frame). - -### Heartbeat - -- If no updates occur within `max_emit_interval_ms`, emit the latest snapshot - (or empty snapshot if pool is empty) to guarantee <= 1s lag. - -### Subscriber Limits - -- Enforce an explicit maximum number of concurrent subscribers. -- Default: **5 subscribers**. -- Configurable via ev-reth config. -- Reject new subscriptions with an error when limit is reached. - -### Sync Requirement - -- Reject subscription requests with an error if ev-reth is still syncing to - chain tip. -- Subscription is only available when ev-reth is fully synced. - -### Resource Safety - -- No unbounded queues per subscriber. -- Snapshots are bounded by `max_bytes` and `max_gas`. -- Per-subscriber state is O(1) (snapshot_id, last_emit, dirty flag). - -## ev-node Integration - -### Config - -| Config Key | Type | Default | Description | -|--------------------------|--------|--------------------|----------------------------------------------------| -| `evm.ws-url` | string | none | WebSocket URL for ev-reth (required for streaming) | -| `evm.txpool_subscribe` | bool | true if ws-url set | Enable txpool subscription | -| `evm.txpool_buffer_size` | int | 3 | Local snapshot buffer size (3-5 recommended) | - -Single `ws-url` only; high availability is handled at infrastructure level -(load balancer, DNS failover). - -### Reaper Behavior - -- If streaming is available, consume snapshots and submit new txs. -- If stream is unavailable: - - Fall back to current polling behavior (`GetTxs`). - - Retry stream connection with exponential backoff. - -### Connection Lifecycle - -1. On startup, attempt WebSocket connection to `evm.ws-url`. -2. Subscribe to `txpoolExt_subscribeBestTxs` with configured params. -3. If subscription is rejected (ev-reth syncing, limit reached), fall back to - polling and retry after backoff. -4. On disconnect, switch to polling immediately and begin reconnection attempts. - -### Reconnection Backoff - -- **Base interval**: 100ms -- **Multiplier**: 2x on each failure -- **Cap**: 30 seconds -- **Retry**: Unbounded (retry forever while polling provides fallback) - -### Snapshot ID Regression Handling - -When ev-node detects `snapshot_id` regression (indicating ev-reth restart or -failover): - -1. Clear the local snapshot buffer (discard all buffered snapshots). -2. Clear the seen-tx cache. -3. Continue processing from the new snapshot. - -### Local Buffer - -- Maintain a bounded buffer of **3-5 snapshots** (configurable). -- Always consume snapshots from WebSocket into buffer. -- If buffer is full, drop oldest snapshot. -- Log at WARN level and increment metric when dropping snapshots. - -### Fallback Transition - -- On stream disconnect: immediately switch to polling. -- On stream reconnect: immediately switch to streaming (trust dedup for overlap). -- No overlap window or drain period required. - -### Deduplication - -- Maintain seen-tx cache (hash-based). -- For each snapshot, filter txs already seen and submit only new txs. -- **Eviction policy**: Finality-driven. Clear entries when the batch containing - them is confirmed on DA layer. -- On `snapshot_id` regression, clear the entire seen-tx cache. - -### Validation - -- Trust ev-reth implicitly; no RLP validation on receipt. -- Malformed data will fail at decode time during actual use. - -### Logging - -- Log each received snapshot at DEBUG level: - - `snapshot_id` - - `tx_count` - - `receipt_lag_ms` -- Log stream connect/disconnect at INFO level. -- Log snapshot drops at WARN level. - -### Semantics - -- Ordered txs are provided by ev-reth; ev-node does not reorder. -- Snapshot processing is idempotent due to seen-tx cache. - -## Failure Modes and Recovery - -| Failure | ev-reth Behavior | ev-node Behavior | -|-------------------|-----------------------------------------------------------------------|-----------------------------------------------------------------------| -| Stream disconnect | N/A | Switch to polling, begin reconnect backoff | -| ev-reth overload | Snapshot rate reduced via debounce; slow subscribers dropped after 5s | Polling fallback handles availability | -| Missed updates | N/A | Next snapshot is a full best-txs view; ev-node recovers automatically | -| ev-reth restart | snapshot_id resets to 0 | Detect regression, clear cache and buffer, continue | -| ev-reth syncing | Reject subscription | Fall back to polling, retry subscription | - -## Security and DoS Considerations - -- Subscription is WS-only; exposure should follow existing RPC security policy. -- Authentication: network-level only (firewall, VPC). No application-level auth. -- Rate limits enforced via `min_emit_interval_ms` and backpressure. -- Subscriber cap prevents resource exhaustion. -- No unbounded memory growth. - -## Observability - -### ev-reth Metrics - -| Metric | Type | Description | -|----------------------------------|-----------|---------------------------------------| -| `txpoolext_subscribers` | gauge | Current number of active subscribers | -| `txpoolext_snapshot_emits_total` | counter | Total snapshots emitted | -| `txpoolext_snapshot_drop_total` | counter | Snapshots dropped due to backpressure | -| `txpoolext_snapshot_build_ms` | histogram | Time to build a snapshot | - -### ev-node Metrics - -| Metric | Type | Description | -|--------------------------------------|-----------|--------------------------------------------| -| `reaper_stream_connected` | gauge | 1 if stream connected, 0 otherwise | -| `reaper_stream_reconnects_total` | counter | Total reconnection attempts | -| `reaper_stream_snapshot_lag_ms` | histogram | Receipt lag (timestamp_ms vs receive time) | -| `reaper_stream_fallback_polls_total` | counter | Polls made while in fallback mode | -| `reaper_stream_buffer_drops_total` | counter | Snapshots dropped due to full buffer | - -## Compatibility - -- `txpoolExt_getTxs` remains unchanged. -- HTTP-only clients unaffected. -- WS subscription is additive and optional. -- No protocol versioning in initial release; add when breaking changes are needed. - -## Testing Plan - -### Unit Tests (ev-reth) - -- Snapshot ordering matches `getTxs`. -- `max_bytes`/`max_gas` enforced. -- Heartbeat interval honored (emits at max_emit_interval_ms). -- Heartbeat emits empty snapshot when pool is empty. -- Backpressure drops intermediate snapshots. -- Subscriber cap enforced. -- Subscription rejected when syncing. - -### Unit Tests (ev-node) - -- Snapshot buffer bounded correctly. -- Oldest snapshot dropped when buffer full. -- seen-tx cache cleared on snapshot_id regression. -- Buffer cleared on snapshot_id regression. -- Reconnect backoff schedule correct (100ms base, 2x, 30s cap). -- Immediate fallback on disconnect. -- Immediate switch on reconnect. - -### Integration Tests - -- ev-node receives updates <= 1s. -- Stream disconnect triggers polling fallback. -- Ordering preserved end-to-end. -- seen-tx cache eviction on DA finality. -- Recovery from ev-reth restart (snapshot_id regression). - -## Rollout Plan - -1. Add ev-reth subscription API and metrics. -2. Add ev-node WS subscription client with fallback polling. -3. Enable subscription by default when `evm.ws-url` is set. -4. Keep existing polling configuration for opt-out and as fallback. - -## Summary of Key Decisions - -| Decision | Choice | -|-----------------------|--------------------------------------| -| Ordering semantics | Locked on first sight | -| snapshot_id lifecycle | Ephemeral, resets on restart | -| Cache eviction | Finality-driven | -| Fallback transition | Immediate switch, trust dedup | -| Stall timeout | 5s, abrupt close | -| Debounce strategy | Time-only | -| Payload format | Blobs only, ev-node decodes | -| Oversized tx handling | Silent exclusion | -| Authentication | Network-level only | -| Client buffering | 3-5 snapshots, drop oldest | -| Multi-reth support | Single ws-url, infra-level HA | -| Empty pool emission | Skip (except heartbeat) | -| Heartbeat on empty | Emits empty snapshot | -| Reconnect backoff | 100ms base, 30s cap, unbounded retry | -| Timestamp source | Wall clock | -| Drop visibility | Log + metric | -| Validation | Trust ev-reth implicitly | -| Lag measurement | Receipt lag only | -| Param updates | Reconnect required | -| Pre-sync behavior | Reject subscription | -| Snapshot logging | DEBUG level | -| Snapshot build cap | Scan until limits hit | -| Subscriber limit | Explicit cap, default 5 | -| Regression handling | Clear cache and buffer | -| Default intervals | 50ms / 1000ms | -| Health exposure | Metrics only | -| Protocol versioning | None initially | From bc3584e1e1a94ac1cf854b047aca0db35393da5e Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 4 Mar 2026 13:06:49 +0100 Subject: [PATCH 8/8] test stability --- test/e2e/failover_e2e_test.go | 105 +++++++++++++++++++++++++++------- 1 file changed, 84 insertions(+), 21 deletions(-) diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 1f92723a6c..36120fc523 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -219,20 +219,15 @@ func TestLeaseFailoverE2E(t *testing.T) { return err == nil }, time.Second, 100*time.Millisecond) - lastDABlockNewLeader = queryLastDAHeight(t, env.SequencerJWT, env.Endpoints.GetDAAddress()) - genesisHeight := state.InitialHeight verifyNoDoubleSigning(t, clusterNodes, genesisHeight, state.LastBlockHeight) - // wait for the next DA block to ensure all blocks are propagated - require.Eventually(t, func() bool { - before := lastDABlockNewLeader - lastDABlockNewLeader = queryLastDAHeight(t, env.SequencerJWT, env.Endpoints.GetDAAddress()) - return before < lastDABlockNewLeader - }, 2*must(time.ParseDuration(DefaultDABlockTime)), 100*time.Millisecond) - + // wait for the DA submitter to catch up — poll until all blocks are on DA t.Log("+++ Verifying no DA gaps...") - verifyDABlocks(t, 1, lastDABlockNewLeader, env.SequencerJWT, env.Endpoints.GetDAAddress(), genesisHeight, state.LastBlockHeight) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + lastDA := queryLastDAHeight(t, env.SequencerJWT, env.Endpoints.GetDAAddress()) + verifyDABlocksCollect(collect, 1, lastDA, env.SequencerJWT, env.Endpoints.GetDAAddress(), genesisHeight, state.LastBlockHeight) + }, 3*must(time.ParseDuration(DefaultDABlockTime)), 500*time.Millisecond) // Cleanup processes clusterNodes.killAll() @@ -508,22 +503,16 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { return err == nil }, time.Second, 100*time.Millisecond) - lastDABlock := queryLastDAHeight(t, env.SequencerJWT, env.Endpoints.GetDAAddress()) - genesisHeight := state.InitialHeight verifyNoDoubleSigning(t, clusterNodes, genesisHeight, state.LastBlockHeight) t.Log("+++ No double-signing detected ✓") - // Wait for the next DA block to ensure all blocks are propagated - require.Eventually(t, func() bool { - before := lastDABlock - lastDABlock = queryLastDAHeight(t, env.SequencerJWT, env.Endpoints.GetDAAddress()) - return before < lastDABlock - }, 2*must(time.ParseDuration(DefaultDABlockTime)), 100*time.Millisecond) - - // Verify no DA gaps + // Wait for the DA submitter to catch up — poll until all blocks are on DA t.Log("+++ Verifying no DA gaps...") - verifyDABlocks(t, 1, lastDABlock, env.SequencerJWT, env.Endpoints.GetDAAddress(), genesisHeight, state.LastBlockHeight) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + lastDA := queryLastDAHeight(t, env.SequencerJWT, env.Endpoints.GetDAAddress()) + verifyDABlocksCollect(collect, 1, lastDA, env.SequencerJWT, env.Endpoints.GetDAAddress(), genesisHeight, state.LastBlockHeight) + }, 3*must(time.ParseDuration(DefaultDABlockTime)), 500*time.Millisecond) t.Log("+++ No DA gaps detected ✓") // Cleanup processes @@ -611,6 +600,80 @@ func verifyDABlocks(t *testing.T, daStartHeight, lastDABlock uint64, jwtSecret s } } +// verifyDABlocksCollect is like verifyDABlocks but uses assert.CollectT so it can be retried +// inside require.EventuallyWithT. +func verifyDABlocksCollect(collect *assert.CollectT, daStartHeight, lastDABlock uint64, jwtSecret string, daAddress string, genesisHeight, lastEVBlock uint64) { + ctx := context.Background() + blobClient, err := blobrpc.NewClient(ctx, daAddress, jwtSecret, "") + if !assert.NoError(collect, err) { + return + } + defer blobClient.Close() + + ns, err := libshare.NewNamespaceFromBytes(coreda.NamespaceFromString(DefaultDANamespace).Bytes()) + if !assert.NoError(collect, err) { + return + } + evHeightsToEvBlockParts := make(map[uint64]int) + deduplicationCache := make(map[string]uint64) + + for daHeight := daStartHeight; daHeight <= lastDABlock; daHeight++ { + blobs, err := blobClient.Blob.GetAll(ctx, daHeight, []libshare.Namespace{ns}) + if err != nil { + if strings.Contains(err.Error(), "blob: not found") { + continue + } + assert.NoError(collect, err, "height %d/%d", daHeight, lastDABlock) + return + } + if len(blobs) == 0 { + continue + } + + for _, blob := range blobs { + if evHeight, hash, blobType := extractBlockHeightRaw(blob.Data()); evHeight != 0 { + _ = blobType + if height, ok := deduplicationCache[hash.String()]; ok { + assert.Equal(collect, evHeight, height) + continue + } + assert.GreaterOrEqual(collect, evHeight, genesisHeight) + deduplicationCache[hash.String()] = evHeight + evHeightsToEvBlockParts[evHeight]++ + } + } + } + + for h := genesisHeight; h <= lastEVBlock; h++ { + assert.NotEmpty(collect, evHeightsToEvBlockParts[h], "missing block on DA for height %d/%d", h, lastEVBlock) + assert.Less(collect, evHeightsToEvBlockParts[h], 3, "duplicate block on DA for height %d/%d", h, lastEVBlock) + } +} + +// extractBlockHeightRaw is like extractBlockHeight but doesn't require *testing.T. +func extractBlockHeightRaw(blob []byte) (uint64, types.Hash, string) { + if len(blob) == 0 { + return 0, nil, "" + } + var headerPb pb.SignedHeader + if err := proto.Unmarshal(blob, &headerPb); err == nil { + var signedHeader types.SignedHeader + if err := signedHeader.FromProto(&headerPb); err == nil { + if err := signedHeader.Header.ValidateBasic(); err == nil { + return signedHeader.Height(), signedHeader.Hash(), "header" + } + } + } + + var signedData types.SignedData + if err := signedData.UnmarshalBinary(blob); err == nil { + if signedData.Metadata != nil { + return signedData.Height(), signedData.Hash(), "data" + } + } + return 0, nil, "" +} + // extractBlockHeight attempts to decode a blob as SignedHeader or SignedData and extract the block height func extractBlockHeight(t *testing.T, blob []byte) (uint64, types.Hash, string) { t.Helper()