diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b2c33827..e0c086087 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add DA Hints for P2P transactions. This allows a catching up node to be on sync with both DA and P2P. ([#2891](https://github.com/evstack/ev-node/pull/2891)) + +### Changes + +- Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046)) + ## v1.0.0-rc.2 ### Changes diff --git a/block/internal/cache/pending_data.go b/block/internal/cache/pending_data.go index 0cc324eca..b31334a41 100644 --- a/block/internal/cache/pending_data.go +++ b/block/internal/cache/pending_data.go @@ -53,10 +53,6 @@ func NewPendingData(store store.Store, logger zerolog.Logger) (*PendingData, err return &PendingData{base: base}, nil } -func (pd *PendingData) init() error { - return pd.base.init() -} - // GetPendingData returns a sorted slice of pending Data along with their marshalled bytes. func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) { dataList, err := pd.base.getPending(ctx) @@ -81,12 +77,41 @@ func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]b } func (pd *PendingData) NumPendingData() uint64 { + pd.advancePastEmptyData(context.Background()) return pd.base.numPending() } func (pd *PendingData) SetLastSubmittedDataHeight(ctx context.Context, newLastSubmittedDataHeight uint64) { pd.base.setLastSubmittedHeight(ctx, newLastSubmittedDataHeight) } + +// advancePastEmptyData advances lastSubmittedDataHeight past any consecutive empty data blocks. +// This ensures that NumPendingData doesn't count empty data that won't be published to DA. +func (pd *PendingData) advancePastEmptyData(ctx context.Context) { + storeHeight, err := pd.base.store.Height(ctx) + if err != nil { + return + } + + currentHeight := pd.base.getLastSubmittedHeight() + + for height := currentHeight + 1; height <= storeHeight; height++ { + data, err := fetchData(ctx, pd.base.store, height) + if err != nil { + // Can't fetch data (might be in-flight or error), stop advancing + return + } + + if len(data.Txs) > 0 { + // Found non-empty data, stop advancing + return + } + + // Empty data, advance past it + pd.base.setLastSubmittedHeight(ctx, height) + } +} + func (pd *PendingData) GetLastSubmittedDataHeight() uint64 { return pd.base.getLastSubmittedHeight() } diff --git a/block/internal/cache/pending_data_test.go b/block/internal/cache/pending_data_test.go index 75679a24f..10dc87382 100644 --- a/block/internal/cache/pending_data_test.go +++ b/block/internal/cache/pending_data_test.go @@ -17,11 +17,11 @@ func TestPendingData_BasicFlow(t *testing.T) { ctx := context.Background() store := memStore(t) - // three blocks; PendingData should return all data (no filtering here) + // three blocks with transactions chainID := "pd-basic" - h1, d1 := types.GetRandomBlock(1, 0, chainID) + h1, d1 := types.GetRandomBlock(1, 1, chainID) h2, d2 := types.GetRandomBlock(2, 1, chainID) - h3, d3 := types.GetRandomBlock(3, 2, chainID) + h3, d3 := types.GetRandomBlock(3, 1, chainID) for i, p := range []struct { h *types.SignedHeader @@ -37,7 +37,7 @@ func TestPendingData_BasicFlow(t *testing.T) { pendingData, err := NewPendingData(store, zerolog.Nop()) require.NoError(t, err) - // initially all 3 data items are pending, incl. empty + // initially all 3 data items are pending require.Equal(t, uint64(3), pendingData.NumPendingData()) pendingDataList, _, err := pendingData.GetPendingData(ctx) require.NoError(t, err) @@ -59,6 +59,125 @@ func TestPendingData_BasicFlow(t *testing.T) { require.Equal(t, uint64(2), pendingDataList[0].Height()) } +func TestPendingData_AdvancesPastEmptyData(t *testing.T) { + t.Parallel() + ctx := context.Background() + store := memStore(t) + + // Create blocks: non-empty, empty, empty, non-empty + chainID := "pd-empty" + h1, d1 := types.GetRandomBlock(1, 1, chainID) // 1 tx + h2, d2 := types.GetRandomBlock(2, 0, chainID) // empty + h3, d3 := types.GetRandomBlock(3, 0, chainID) // empty + h4, d4 := types.GetRandomBlock(4, 1, chainID) // 1 tx + + for i, p := range []struct { + h *types.SignedHeader + d *types.Data + }{{h1, d1}, {h2, d2}, {h3, d3}, {h4, d4}} { + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{})) + require.NoError(t, batch.SetHeight(uint64(i+1))) + require.NoError(t, batch.Commit()) + } + + pendingData, err := NewPendingData(store, zerolog.Nop()) + require.NoError(t, err) + + // Initially 4 pending - height 1 is non-empty so no advancing happens yet + require.Equal(t, uint64(4), pendingData.NumPendingData()) + require.Equal(t, uint64(0), pendingData.GetLastSubmittedDataHeight()) + + // Submit height 1 (non-empty) + pendingData.SetLastSubmittedDataHeight(ctx, 1) + require.Equal(t, uint64(1), pendingData.GetLastSubmittedDataHeight()) + + // NumPendingData advances past empty blocks 2 and 3, leaving only height 4 + require.Equal(t, uint64(1), pendingData.NumPendingData()) + + // Should have advanced to height 3 (past empty blocks 2 and 3) + require.Equal(t, uint64(3), pendingData.GetLastSubmittedDataHeight()) + + pendingDataList, _, err := pendingData.GetPendingData(ctx) + require.NoError(t, err) + require.Len(t, pendingDataList, 1) + require.Equal(t, uint64(4), pendingDataList[0].Height()) +} + +func TestPendingData_AdvancesPastAllEmptyToEnd(t *testing.T) { + t.Parallel() + ctx := context.Background() + store := memStore(t) + + // Create blocks: non-empty, empty, empty (all remaining are empty) + chainID := "pd-all-empty" + h1, d1 := types.GetRandomBlock(1, 1, chainID) // 1 tx + h2, d2 := types.GetRandomBlock(2, 0, chainID) // empty + h3, d3 := types.GetRandomBlock(3, 0, chainID) // empty + + for i, p := range []struct { + h *types.SignedHeader + d *types.Data + }{{h1, d1}, {h2, d2}, {h3, d3}} { + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{})) + require.NoError(t, batch.SetHeight(uint64(i+1))) + require.NoError(t, batch.Commit()) + } + + pendingData, err := NewPendingData(store, zerolog.Nop()) + require.NoError(t, err) + + // Submit height 1 + pendingData.SetLastSubmittedDataHeight(ctx, 1) + require.Equal(t, uint64(1), pendingData.GetLastSubmittedDataHeight()) + + // NumPendingData advances past empty blocks to end of store + require.Equal(t, uint64(0), pendingData.NumPendingData()) + + // Should have advanced to height 3 + require.Equal(t, uint64(3), pendingData.GetLastSubmittedDataHeight()) +} + +func TestPendingData_AdvancesPastEmptyAtStart(t *testing.T) { + t.Parallel() + ctx := context.Background() + store := memStore(t) + + // Create blocks: empty, empty, non-empty + chainID := "pd-empty-start" + h1, d1 := types.GetRandomBlock(1, 0, chainID) // empty + h2, d2 := types.GetRandomBlock(2, 0, chainID) // empty + h3, d3 := types.GetRandomBlock(3, 1, chainID) // 1 tx + + for i, p := range []struct { + h *types.SignedHeader + d *types.Data + }{{h1, d1}, {h2, d2}, {h3, d3}} { + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{})) + require.NoError(t, batch.SetHeight(uint64(i+1))) + require.NoError(t, batch.Commit()) + } + + pendingData, err := NewPendingData(store, zerolog.Nop()) + require.NoError(t, err) + + // NumPendingData should advance past empty blocks 1 and 2, leaving only height 3 + require.Equal(t, uint64(1), pendingData.NumPendingData()) + + // Should have advanced to height 2 (past empty blocks 1 and 2) + require.Equal(t, uint64(2), pendingData.GetLastSubmittedDataHeight()) + + pendingDataList, _, err := pendingData.GetPendingData(ctx) + require.NoError(t, err) + require.Len(t, pendingDataList, 1) + require.Equal(t, uint64(3), pendingDataList[0].Height()) +} + func TestPendingData_InitFromMetadata(t *testing.T) { t.Parallel() ctx := context.Background() @@ -71,9 +190,11 @@ func TestPendingData_InitFromMetadata(t *testing.T) { binary.LittleEndian.PutUint64(bz, 2) require.NoError(t, store.SetMetadata(ctx, LastSubmittedDataHeightKey, bz)) - // store height is 3 + // store height is 3, with a non-empty block at height 3 + h3, d3 := types.GetRandomBlock(3, 1, "test-chain") batch, err := store.NewBatch(ctx) require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h3, d3, &types.Signature{})) require.NoError(t, batch.SetHeight(3)) require.NoError(t, batch.Commit()) @@ -99,5 +220,5 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) { // fetching pending should propagate the not-found error from store pending, _, err := pendingData.GetPendingData(ctx) require.Error(t, err) - require.Empty(t, pending) + require.Nil(t, pending) } diff --git a/block/internal/cache/pending_headers.go b/block/internal/cache/pending_headers.go index 2f455ee53..d12f9627a 100644 --- a/block/internal/cache/pending_headers.go +++ b/block/internal/cache/pending_headers.go @@ -80,10 +80,6 @@ func (ph *PendingHeaders) SetLastSubmittedHeaderHeight(ctx context.Context, newL ph.base.setLastSubmittedHeight(ctx, newLastSubmittedHeaderHeight) } -func (ph *PendingHeaders) init() error { - return ph.base.init() -} - func (ph *PendingHeaders) GetLastSubmittedHeaderHeight() uint64 { return ph.base.getLastSubmittedHeight() } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index cac8ebd1c..17f6ce9bc 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -177,7 +177,6 @@ func (s *Submitter) daSubmissionLoop() { case <-ticker.C: // Check if we should submit headers based on batching strategy headersNb := s.cache.NumPendingHeaders() - if headersNb > 0 { lastSubmitNanos := s.lastHeaderSubmit.Load() timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos))