Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add DA Hints for P2P transactions. This allows a catching up node to be on sync with both DA and P2P. ([#2891](https://github.com/evstack/ev-node/pull/2891))

### Changes

- Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046))

## v1.0.0-rc.2

### Changes
Expand Down
33 changes: 29 additions & 4 deletions block/internal/cache/pending_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func NewPendingData(store store.Store, logger zerolog.Logger) (*PendingData, err
return &PendingData{base: base}, nil
}

func (pd *PendingData) init() error {
return pd.base.init()
}

// GetPendingData returns a sorted slice of pending Data along with their marshalled bytes.
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) {
dataList, err := pd.base.getPending(ctx)
Expand All @@ -81,12 +77,41 @@ func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]b
}

func (pd *PendingData) NumPendingData() uint64 {
pd.advancePastEmptyData(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The advancePastEmptyData function is called with context.Background(). While NumPendingData itself doesn't accept a context, advancePastEmptyData performs store operations that could potentially be long-running or require cancellation. Using context.Background() means these operations are not cancellable and will run indefinitely if the underlying store operations hang. Consider if NumPendingData should accept a context, or if a context with a timeout/cancellation should be derived for advancePastEmptyData to ensure responsiveness in scenarios where NumPendingData might be called in a cancellable loop.

return pd.base.numPending()
}

func (pd *PendingData) SetLastSubmittedDataHeight(ctx context.Context, newLastSubmittedDataHeight uint64) {
pd.base.setLastSubmittedHeight(ctx, newLastSubmittedDataHeight)
}

// advancePastEmptyData advances lastSubmittedDataHeight past any consecutive empty data blocks.
// This ensures that NumPendingData doesn't count empty data that won't be published to DA.
func (pd *PendingData) advancePastEmptyData(ctx context.Context) {
storeHeight, err := pd.base.store.Height(ctx)
if err != nil {
return
}

currentHeight := pd.base.getLastSubmittedHeight()

for height := currentHeight + 1; height <= storeHeight; height++ {
data, err := fetchData(ctx, pd.base.store, height)
if err != nil {
// Can't fetch data (might be in-flight or error), stop advancing
return
}

if len(data.Txs) > 0 {
// Found non-empty data, stop advancing
return
}

// Empty data, advance past it
pd.base.setLastSubmittedHeight(ctx, height)
}
}

func (pd *PendingData) GetLastSubmittedDataHeight() uint64 {
return pd.base.getLastSubmittedHeight()
}
133 changes: 127 additions & 6 deletions block/internal/cache/pending_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func TestPendingData_BasicFlow(t *testing.T) {
ctx := context.Background()
store := memStore(t)

// three blocks; PendingData should return all data (no filtering here)
// three blocks with transactions
chainID := "pd-basic"
h1, d1 := types.GetRandomBlock(1, 0, chainID)
h1, d1 := types.GetRandomBlock(1, 1, chainID)
h2, d2 := types.GetRandomBlock(2, 1, chainID)
h3, d3 := types.GetRandomBlock(3, 2, chainID)
h3, d3 := types.GetRandomBlock(3, 1, chainID)

for i, p := range []struct {
h *types.SignedHeader
Expand All @@ -37,7 +37,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)

// initially all 3 data items are pending, incl. empty
// initially all 3 data items are pending
require.Equal(t, uint64(3), pendingData.NumPendingData())
pendingDataList, _, err := pendingData.GetPendingData(ctx)
require.NoError(t, err)
Expand All @@ -59,6 +59,125 @@ func TestPendingData_BasicFlow(t *testing.T) {
require.Equal(t, uint64(2), pendingDataList[0].Height())
}

func TestPendingData_AdvancesPastEmptyData(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)

// Create blocks: non-empty, empty, empty, non-empty
chainID := "pd-empty"
h1, d1 := types.GetRandomBlock(1, 1, chainID) // 1 tx
h2, d2 := types.GetRandomBlock(2, 0, chainID) // empty
h3, d3 := types.GetRandomBlock(3, 0, chainID) // empty
h4, d4 := types.GetRandomBlock(4, 1, chainID) // 1 tx

for i, p := range []struct {
h *types.SignedHeader
d *types.Data
}{{h1, d1}, {h2, d2}, {h3, d3}, {h4, d4}} {
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
require.NoError(t, batch.SetHeight(uint64(i+1)))
require.NoError(t, batch.Commit())
}

pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)

// Initially 4 pending - height 1 is non-empty so no advancing happens yet
require.Equal(t, uint64(4), pendingData.NumPendingData())
require.Equal(t, uint64(0), pendingData.GetLastSubmittedDataHeight())

// Submit height 1 (non-empty)
pendingData.SetLastSubmittedDataHeight(ctx, 1)
require.Equal(t, uint64(1), pendingData.GetLastSubmittedDataHeight())

// NumPendingData advances past empty blocks 2 and 3, leaving only height 4
require.Equal(t, uint64(1), pendingData.NumPendingData())

// Should have advanced to height 3 (past empty blocks 2 and 3)
require.Equal(t, uint64(3), pendingData.GetLastSubmittedDataHeight())

pendingDataList, _, err := pendingData.GetPendingData(ctx)
require.NoError(t, err)
require.Len(t, pendingDataList, 1)
require.Equal(t, uint64(4), pendingDataList[0].Height())
}

func TestPendingData_AdvancesPastAllEmptyToEnd(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)

// Create blocks: non-empty, empty, empty (all remaining are empty)
chainID := "pd-all-empty"
h1, d1 := types.GetRandomBlock(1, 1, chainID) // 1 tx
h2, d2 := types.GetRandomBlock(2, 0, chainID) // empty
h3, d3 := types.GetRandomBlock(3, 0, chainID) // empty

for i, p := range []struct {
h *types.SignedHeader
d *types.Data
}{{h1, d1}, {h2, d2}, {h3, d3}} {
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
require.NoError(t, batch.SetHeight(uint64(i+1)))
require.NoError(t, batch.Commit())
}

pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)

// Submit height 1
pendingData.SetLastSubmittedDataHeight(ctx, 1)
require.Equal(t, uint64(1), pendingData.GetLastSubmittedDataHeight())

// NumPendingData advances past empty blocks to end of store
require.Equal(t, uint64(0), pendingData.NumPendingData())

// Should have advanced to height 3
require.Equal(t, uint64(3), pendingData.GetLastSubmittedDataHeight())
}

func TestPendingData_AdvancesPastEmptyAtStart(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)

// Create blocks: empty, empty, non-empty
chainID := "pd-empty-start"
h1, d1 := types.GetRandomBlock(1, 0, chainID) // empty
h2, d2 := types.GetRandomBlock(2, 0, chainID) // empty
h3, d3 := types.GetRandomBlock(3, 1, chainID) // 1 tx

for i, p := range []struct {
h *types.SignedHeader
d *types.Data
}{{h1, d1}, {h2, d2}, {h3, d3}} {
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
require.NoError(t, batch.SetHeight(uint64(i+1)))
require.NoError(t, batch.Commit())
}

pendingData, err := NewPendingData(store, zerolog.Nop())
require.NoError(t, err)

// NumPendingData should advance past empty blocks 1 and 2, leaving only height 3
require.Equal(t, uint64(1), pendingData.NumPendingData())

// Should have advanced to height 2 (past empty blocks 1 and 2)
require.Equal(t, uint64(2), pendingData.GetLastSubmittedDataHeight())

pendingDataList, _, err := pendingData.GetPendingData(ctx)
require.NoError(t, err)
require.Len(t, pendingDataList, 1)
require.Equal(t, uint64(3), pendingDataList[0].Height())
}

func TestPendingData_InitFromMetadata(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand All @@ -71,9 +190,11 @@ func TestPendingData_InitFromMetadata(t *testing.T) {
binary.LittleEndian.PutUint64(bz, 2)
require.NoError(t, store.SetMetadata(ctx, LastSubmittedDataHeightKey, bz))

// store height is 3
// store height is 3, with a non-empty block at height 3
h3, d3 := types.GetRandomBlock(3, 1, "test-chain")
batch, err := store.NewBatch(ctx)
require.NoError(t, err)
require.NoError(t, batch.SaveBlockData(h3, d3, &types.Signature{}))
require.NoError(t, batch.SetHeight(3))
require.NoError(t, batch.Commit())

Expand All @@ -99,5 +220,5 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
// fetching pending should propagate the not-found error from store
pending, _, err := pendingData.GetPendingData(ctx)
require.Error(t, err)
require.Empty(t, pending)
require.Nil(t, pending)
}
4 changes: 0 additions & 4 deletions block/internal/cache/pending_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ func (ph *PendingHeaders) SetLastSubmittedHeaderHeight(ctx context.Context, newL
ph.base.setLastSubmittedHeight(ctx, newLastSubmittedHeaderHeight)
}

func (ph *PendingHeaders) init() error {
return ph.base.init()
}

func (ph *PendingHeaders) GetLastSubmittedHeaderHeight() uint64 {
return ph.base.getLastSubmittedHeight()
}
1 change: 0 additions & 1 deletion block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (s *Submitter) daSubmissionLoop() {
case <-ticker.C:
// Check if we should submit headers based on batching strategy
headersNb := s.cache.NumPendingHeaders()

if headersNb > 0 {
lastSubmitNanos := s.lastHeaderSubmit.Load()
timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos))
Expand Down
Loading