diff --git a/CHANGELOG.md b/CHANGELOG.md index 75d38063e1..a159322578 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## v1.0.0 + ### Added - Add disaster recovery for sequencer diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 45b2c8c006..2cc85d27d3 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -3,13 +3,10 @@ package cache import ( "context" "encoding/binary" - "fmt" - "strings" "sync" "sync/atomic" lru "github.com/hashicorp/golang-lru/v2" - "github.com/rs/zerolog/log" "github.com/evstack/ev-node/pkg/store" ) @@ -25,8 +22,16 @@ const ( DefaultDAIncludedCacheSize = 200_000 ) -// Cache is a generic cache that maintains items that are seen and hard confirmed. -// Uses bounded thread-safe LRU caches to prevent unbounded memory growth. +// snapshotEntry is one record in the persisted snapshot. +// Encoded as 16 bytes: [blockHeight uint64 LE][daHeight uint64 LE]. +type snapshotEntry struct { + blockHeight uint64 + daHeight uint64 +} + +const snapshotEntrySize = 16 // bytes per snapshotEntry + +// Cache tracks seen blocks and DA inclusion status using bounded LRU caches. type Cache[T any] struct { // itemsByHeight stores items keyed by uint64 height. // Mutex needed for atomic get-and-remove in getNextItem. @@ -36,31 +41,35 @@ type Cache[T any] struct { // hashes tracks whether a given hash has been seen hashes *lru.Cache[string, bool] - // daIncluded tracks the DA inclusion height for a given hash + // daIncluded maps hash → daHeight. Hash may be a real content hash or a + // height placeholder (see HeightPlaceholderKey) immediately after restore. daIncluded *lru.Cache[string, uint64] - // hashByHeight tracks the hash associated with each height for pruning. - // Mutex needed for atomic operations in deleteAllForHeight. + // hashByHeight maps blockHeight → hash, used for pruning and height-based + // lookups. Protected by hashByHeightMu only in deleteAllForHeight where a + // read-then-remove must be atomic. hashByHeight *lru.Cache[uint64, string] hashByHeightMu sync.Mutex // maxDAHeight tracks the maximum DA height seen maxDAHeight *atomic.Uint64 - // store is used for persisting DA inclusion data (optional, can be nil for ephemeral caches) - store store.Store + store store.Store // nil = ephemeral, no persistence // storeKeyPrefix is the prefix used for store keys storeKeyPrefix string } -// NewCache returns a new Cache struct with default sizes. -// If store and keyPrefix are provided, DA inclusion data will be persisted to the store for populating the cache on restarts. +func (c *Cache[T]) snapshotKey() string { + return c.storeKeyPrefix + "__snap" +} + +// NewCache creates a Cache. When store and keyPrefix are set, mutations +// persist a snapshot so RestoreFromStore can recover in-flight state. func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { // LRU cache creation only fails if size <= 0, which won't happen with our defaults itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) - // hashByHeight must be at least as large as hashes cache to ensure proper pruning. hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) return &Cache[T]{ @@ -74,31 +83,7 @@ func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { } } -// storeKey returns the store key for a given hash. -func (c *Cache[T]) storeKey(hash string) string { - return c.storeKeyPrefix + hash -} - -// encodeDAInclusion encodes daHeight and blockHeight into a 16-byte value. -func encodeDAInclusion(daHeight, blockHeight uint64) []byte { - value := make([]byte, 16) // 8 bytes for daHeight + 8 bytes for blockHeight - binary.LittleEndian.PutUint64(value[0:8], daHeight) - binary.LittleEndian.PutUint64(value[8:16], blockHeight) - return value -} - -// decodeDAInclusion decodes a 16-byte value into daHeight and blockHeight. -func decodeDAInclusion(value []byte) (daHeight, blockHeight uint64, ok bool) { - if len(value) != 16 { - return 0, 0, false - } - daHeight = binary.LittleEndian.Uint64(value[0:8]) - blockHeight = binary.LittleEndian.Uint64(value[8:16]) - return daHeight, blockHeight, true -} - // getItem returns an item from the cache by height. -// Returns nil if not found or type mismatch. func (c *Cache[T]) getItem(height uint64) *T { item, ok := c.itemsByHeight.Get(height) if !ok { @@ -107,13 +92,12 @@ func (c *Cache[T]) getItem(height uint64) *T { return item } -// setItem sets an item in the cache by height +// setItem sets an item in the cache by height. func (c *Cache[T]) setItem(height uint64, item *T) { c.itemsByHeight.Add(height, item) } -// getNextItem returns the item at the specified height and removes it from cache if found. -// Returns nil if not found. +// getNextItem returns and removes the item at height, or nil if absent. func (c *Cache[T]) getNextItem(height uint64) *T { c.itemsByHeightMu.Lock() defer c.itemsByHeightMu.Unlock() @@ -126,60 +110,60 @@ func (c *Cache[T]) getNextItem(height uint64) *T { return item } -// isSeen returns true if the hash has been seen +// isSeen returns true if the hash has been seen. func (c *Cache[T]) isSeen(hash string) bool { seen, ok := c.hashes.Get(hash) - if !ok { - return false - } - return seen + return ok && seen } -// setSeen sets the hash as seen and tracks its height for pruning +// setSeen sets the hash as seen and tracks its height for pruning. func (c *Cache[T]) setSeen(hash string, height uint64) { c.hashes.Add(hash, true) c.hashByHeight.Add(height, hash) } -// getDAIncluded returns the DA height if the hash has been DA-included, otherwise it returns 0. +// getDAIncluded returns the DA height if the hash has been DA-included. func (c *Cache[T]) getDAIncluded(hash string) (uint64, bool) { - daHeight, ok := c.daIncluded.Get(hash) + return c.daIncluded.Get(hash) +} + +// getDAIncludedByHeight resolves DA height via the height→hash index. +// Works for both real hashes (steady state) and snapshot placeholders +// (post-restart, before the DA retriever re-fires the real hash). +func (c *Cache[T]) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { + hash, ok := c.hashByHeight.Get(blockHeight) if !ok { return 0, false } - return daHeight, true + return c.getDAIncluded(hash) } -// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning. +// setDAIncluded records DA inclusion and persists the snapshot. +// If a previous entry already exists at blockHeight (e.g. a placeholder from +// RestoreFromStore), it is evicted from daIncluded to avoid orphan leaks. func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { + if prev, ok := c.hashByHeight.Get(blockHeight); ok && prev != hash { + c.daIncluded.Remove(prev) + } c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) - - // Persist to store if configured (for SIGKILL protection) - if c.store != nil { - _ = c.store.SetMetadata(context.Background(), c.storeKey(hash), encodeDAInclusion(daHeight, blockHeight)) - } - - // Update max DA height if necessary c.setMaxDAHeight(daHeight) + c.persistSnapshot(context.Background()) } -// removeDAIncluded removes the DA-included status of the hash from cache and store. +// removeDAIncluded removes the DA-included status of the hash from the cache +// and rewrites the window snapshot. func (c *Cache[T]) removeDAIncluded(hash string) { c.daIncluded.Remove(hash) - if c.store != nil { - _ = c.store.DeleteMetadata(context.Background(), c.storeKey(hash)) - } + c.persistSnapshot(context.Background()) } // daHeight returns the maximum DA height from all DA-included items. -// Returns 0 if no items are DA-included. func (c *Cache[T]) daHeight() uint64 { return c.maxDAHeight.Load() } -// setMaxDAHeight sets the maximum DA height if the provided value is greater -// than the current value. +// setMaxDAHeight sets the maximum DA height if the provided value is greater. func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { for range 1_000 { current := c.maxDAHeight.Load() @@ -197,7 +181,8 @@ func (c *Cache[T]) removeSeen(hash string) { c.hashes.Remove(hash) } -// deleteAllForHeight removes all items and their associated data from the cache and store at the given height. +// deleteAllForHeight removes all items and their associated data from the +// cache at the given height and rewrites the window snapshot. func (c *Cache[T]) deleteAllForHeight(height uint64) { c.itemsByHeight.Remove(height) @@ -210,95 +195,118 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { if ok { c.hashes.Remove(hash) - c.removeDAIncluded(hash) + // Remove from daIncluded without triggering a snapshot write — we will + // write the snapshot once below after the removal is applied. + c.daIncluded.Remove(hash) } + + c.persistSnapshot(context.Background()) } -// RestoreFromStore loads DA inclusion data from the store into the in-memory cache. -// This should be called during initialization to restore persisted state. -// It directly queries store metadata keys with the cache's prefix, avoiding iteration through all blocks. -func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { +// persistSnapshot writes all current in-flight [blockHeight, daHeight] pairs +// to the store under a single key. Called on every mutation; payload is tiny +// (typically <10 entries × 16 bytes). +func (c *Cache[T]) persistSnapshot(ctx context.Context) { if c.store == nil || c.storeKeyPrefix == "" { - return nil // No store configured or no prefix, nothing to restore - } - - // Query all metadata entries with our prefix directly - entries, err := c.store.GetMetadataByPrefix(ctx, c.storeKeyPrefix) - if err != nil { - return fmt.Errorf("failed to query metadata by prefix %q: %w", c.storeKeyPrefix, err) + return } - for _, entry := range entries { - // Extract the hash from the key by removing the prefix - hash := strings.TrimPrefix(entry.Key, c.storeKeyPrefix) - if hash == entry.Key || hash == "" { - // Prefix not found or empty hash - skip invalid entry + heights := c.hashByHeight.Keys() + entries := make([]snapshotEntry, 0, len(heights)) + for _, h := range heights { + hash, ok := c.hashByHeight.Peek(h) + if !ok { continue } - - daHeight, blockHeight, ok := decodeDAInclusion(entry.Value) + daH, ok := c.daIncluded.Peek(hash) if !ok { - log.Warn(). - Str("key", entry.Key). - Int("value_len", len(entry.Value)). - Msg("skipping invalid DA inclusion entry during cache restore") continue } + entries = append(entries, snapshotEntry{blockHeight: h, daHeight: daH}) + } - c.daIncluded.Add(hash, daHeight) - c.hashByHeight.Add(blockHeight, hash) + _ = c.store.SetMetadata(ctx, c.snapshotKey(), encodeSnapshot(entries)) +} - // Update max DA height - c.setMaxDAHeight(daHeight) +// encodeSnapshot serialises a slice of snapshotEntry values into a byte slice. +func encodeSnapshot(entries []snapshotEntry) []byte { + buf := make([]byte, len(entries)*snapshotEntrySize) + for i, e := range entries { + off := i * snapshotEntrySize + binary.LittleEndian.PutUint64(buf[off:], e.blockHeight) + binary.LittleEndian.PutUint64(buf[off+8:], e.daHeight) } - - return nil + return buf } -// SaveToStore persists all current DA inclusion entries to the store. -// This can be called before shutdown to ensure all data is persisted. -func (c *Cache[T]) SaveToStore(ctx context.Context) error { - if c.store == nil { - return nil // No store configured +// decodeSnapshot deserialises a byte slice produced by encodeSnapshot. +// Returns nil if the slice is empty or not a multiple of snapshotEntrySize. +func decodeSnapshot(buf []byte) []snapshotEntry { + if len(buf) == 0 || len(buf)%snapshotEntrySize != 0 { + return nil } + entries := make([]snapshotEntry, len(buf)/snapshotEntrySize) + for i := range entries { + off := i * snapshotEntrySize + entries[i].blockHeight = binary.LittleEndian.Uint64(buf[off:]) + entries[i].daHeight = binary.LittleEndian.Uint64(buf[off+8:]) + } + return entries +} - keys := c.daIncluded.Keys() - for _, hash := range keys { - daHeight, ok := c.daIncluded.Peek(hash) - if !ok { - continue - } +// RestoreFromStore loads the in-flight snapshot with a single store read. +// Each entry is installed as a height placeholder; real hashes replace them +// once the DA retriever re-fires SetHeaderDAIncluded after startup. +// Missing snapshot key is treated as a no-op (fresh node or pre-snapshot version). +func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { + if c.store == nil || c.storeKeyPrefix == "" { + return nil + } - // We need to find the block height for this hash - // Since we track hash by height, we need to iterate - var blockHeight uint64 - heightKeys := c.hashByHeight.Keys() - for _, h := range heightKeys { - if storedHash, ok := c.hashByHeight.Peek(h); ok && storedHash == hash { - blockHeight = h - break - } - } + buf, err := c.store.GetMetadata(ctx, c.snapshotKey()) + if err != nil { + return nil //nolint:nilerr // key absent = nothing to restore + } - if err := c.store.SetMetadata(ctx, c.storeKey(hash), encodeDAInclusion(daHeight, blockHeight)); err != nil { - return fmt.Errorf("failed to save DA inclusion for hash %s: %w", hash, err) - } + for _, e := range decodeSnapshot(buf) { + placeholder := HeightPlaceholderKey(c.storeKeyPrefix, e.blockHeight) + c.daIncluded.Add(placeholder, e.daHeight) + c.hashByHeight.Add(e.blockHeight, placeholder) + c.setMaxDAHeight(e.daHeight) } return nil } -// ClearFromStore removes all DA inclusion entries from the store for this cache. -func (c *Cache[T]) ClearFromStore(ctx context.Context, hashes []string) error { +// HeightPlaceholderKey returns a store key for a height-indexed DA inclusion +// entry used when the real content hash is unavailable (e.g. after restore). +// Format: "__h/" — cannot collide with real 64-char hashes. +func HeightPlaceholderKey(prefix string, height uint64) string { + const hexDigits = "0123456789abcdef" + buf := make([]byte, len(prefix)+4+16) + n := copy(buf, prefix) + n += copy(buf[n:], "__h/") + for i := 15; i >= 0; i-- { + buf[n+i] = hexDigits[height&0xf] + height >>= 4 + } + return string(buf) +} + +// SaveToStore flushes the current snapshot to the store. +func (c *Cache[T]) SaveToStore(ctx context.Context) error { if c.store == nil { return nil } + c.persistSnapshot(ctx) + return nil +} - for _, hash := range hashes { - if err := c.store.DeleteMetadata(ctx, c.storeKey(hash)); err != nil { - return fmt.Errorf("failed to delete DA inclusion for hash %s: %w", hash, err) - } +// ClearFromStore deletes the snapshot key from the store. +func (c *Cache[T]) ClearFromStore(ctx context.Context) error { + if c.store == nil { + return nil } - + _ = c.store.DeleteMetadata(ctx, c.snapshotKey()) return nil } diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index 209455e455..cf320620fc 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -7,269 +7,447 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/evstack/ev-node/pkg/store" + pkgstore "github.com/evstack/ev-node/pkg/store" ) type testItem struct{ V int } -// memStore creates an in-memory store for testing -func testMemStore(t *testing.T) store.Store { - ds, err := store.NewTestInMemoryKVStore() +// testMemStore creates an in-memory store for testing. +func testMemStore(t *testing.T) pkgstore.Store { + t.Helper() + ds, err := pkgstore.NewTestInMemoryKVStore() require.NoError(t, err) - return store.New(ds) + return pkgstore.New(ds) } +// writeSnapshot directly encodes and writes a snapshot into the store under +// the cache's snapshot key (storeKeyPrefix + "__snap"). This simulates the +// state that persistSnapshot would have written during a previous run, so that +// RestoreFromStore can recover from it. +func writeSnapshot(t *testing.T, st pkgstore.Store, storeKeyPrefix string, entries []snapshotEntry) { + t.Helper() + buf := encodeSnapshot(entries) + require.NoError(t, st.SetMetadata(context.Background(), storeKeyPrefix+"__snap", buf)) +} + +// --------------------------------------------------------------------------- +// MaxDAHeight +// --------------------------------------------------------------------------- + // TestCache_MaxDAHeight verifies that daHeight tracks the maximum DA height +// across successive setDAIncluded calls. func TestCache_MaxDAHeight(t *testing.T) { c := NewCache[testItem](nil, "") - // Initially should be 0 - if got := c.daHeight(); got != 0 { - t.Errorf("initial daHeight = %d, want 0", got) - } + assert.Equal(t, uint64(0), c.daHeight(), "initial daHeight should be 0") - // Set items with increasing DA heights c.setDAIncluded("hash1", 100, 1) - if got := c.daHeight(); got != 100 { - t.Errorf("after setDAIncluded(100): daHeight = %d, want 100", got) - } + assert.Equal(t, uint64(100), c.daHeight(), "after setDAIncluded(100)") - c.setDAIncluded("hash2", 50, 2) // Lower height shouldn't change max - if got := c.daHeight(); got != 100 { - t.Errorf("after setDAIncluded(50): daHeight = %d, want 100", got) - } + c.setDAIncluded("hash2", 50, 2) // lower, should not change max + assert.Equal(t, uint64(100), c.daHeight(), "after setDAIncluded(50)") c.setDAIncluded("hash3", 200, 3) - if got := c.daHeight(); got != 200 { - t.Errorf("after setDAIncluded(200): daHeight = %d, want 200", got) - } + assert.Equal(t, uint64(200), c.daHeight(), "after setDAIncluded(200)") } -// TestCache_MaxDAHeight_WithStore verifies that daHeight is restored from store -func TestCache_MaxDAHeight_WithStore(t *testing.T) { +// --------------------------------------------------------------------------- +// RestoreFromStore — O(1) snapshot-based recovery +// --------------------------------------------------------------------------- + +// TestCache_RestoreFromStore_EmptyChain verifies that RestoreFromStore is a +// no-op on a brand-new node (no snapshot key in the store). +func TestCache_RestoreFromStore_EmptyChain(t *testing.T) { + st := testMemStore(t) + + c := NewCache[testItem](st, "hdr/") + require.NoError(t, c.RestoreFromStore(context.Background())) + + assert.Equal(t, 0, c.daIncluded.Len(), "no entries expected on empty chain") + assert.Equal(t, uint64(0), c.daHeight()) +} + +// TestCache_RestoreFromStore_FullyFinalized verifies that when the persisted +// snapshot contains no entries (all blocks finalized, window empty) nothing is +// loaded but maxDAHeight is still zero (no in-flight state). +func TestCache_RestoreFromStore_FullyFinalized(t *testing.T) { st := testMemStore(t) ctx := context.Background() - c1 := NewCache[testItem](st, "test/da-included/") + // Simulate a previous run that had all blocks finalized: the snapshot is + // empty (persistSnapshot writes an empty buf when daIncluded is empty). + writeSnapshot(t, st, "hdr/", nil) - // Set DA included entries - c1.setDAIncluded("hash1", 100, 1) - c1.setDAIncluded("hash2", 200, 2) - c1.setDAIncluded("hash3", 150, 3) + c := NewCache[testItem](st, "hdr/") + require.NoError(t, c.RestoreFromStore(ctx)) - if got := c1.daHeight(); got != 200 { - t.Errorf("after setDAIncluded: daHeight = %d, want 200", got) - } + assert.Equal(t, 0, c.daIncluded.Len(), "no in-flight entries expected") + assert.Equal(t, uint64(0), c.daHeight(), "no in-flight entries means daHeight is 0") +} - err := c1.SaveToStore(ctx) - require.NoError(t, err) +// TestCache_RestoreFromStore_InFlightWindow verifies that the in-flight entries +// encoded in the snapshot are fully recovered on restore. +func TestCache_RestoreFromStore_InFlightWindow(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - // Create new cache and restore from store - c2 := NewCache[testItem](st, "test/da-included/") + // Simulate two in-flight entries written by a previous run: heights 4 and 5. + writeSnapshot(t, st, "hdr/", []snapshotEntry{ + {blockHeight: 4, daHeight: 13}, + {blockHeight: 5, daHeight: 14}, + }) + + c := NewCache[testItem](st, "hdr/") + require.NoError(t, c.RestoreFromStore(ctx)) + + assert.Equal(t, 2, c.daIncluded.Len(), "exactly the in-flight snapshot entries should be loaded") + assert.Equal(t, uint64(14), c.daHeight(), "maxDAHeight should reflect the highest in-flight DA height") + + // Verify the placeholder keys are addressable by height via hashByHeight. + hash4, ok := c.hashByHeight.Get(4) + require.True(t, ok, "hashByHeight[4] should exist") + daH4, ok := c.daIncluded.Get(hash4) + require.True(t, ok) + assert.Equal(t, uint64(13), daH4) + + hash5, ok := c.hashByHeight.Get(5) + require.True(t, ok, "hashByHeight[5] should exist") + daH5, ok := c.daIncluded.Get(hash5) + require.True(t, ok) + assert.Equal(t, uint64(14), daH5) +} - err = c2.RestoreFromStore(ctx) - require.NoError(t, err) +// TestCache_RestoreFromStore_SingleEntry verifies a snapshot with one in-flight +// entry is correctly decoded. +func TestCache_RestoreFromStore_SingleEntry(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - if got := c2.daHeight(); got != 200 { - t.Errorf("after restore: daHeight = %d, want 200", got) - } + writeSnapshot(t, st, "hdr/", []snapshotEntry{ + {blockHeight: 3, daHeight: 20}, + }) - // Verify individual entries were restored - daHeight, ok := c2.getDAIncluded("hash1") - assert.True(t, ok) - assert.Equal(t, uint64(100), daHeight) + c := NewCache[testItem](st, "hdr/") + require.NoError(t, c.RestoreFromStore(ctx)) - daHeight, ok = c2.getDAIncluded("hash2") - assert.True(t, ok) - assert.Equal(t, uint64(200), daHeight) + assert.Equal(t, 1, c.daIncluded.Len(), "one entry should be in-flight") + assert.Equal(t, uint64(20), c.daHeight()) - daHeight, ok = c2.getDAIncluded("hash3") - assert.True(t, ok) - assert.Equal(t, uint64(150), daHeight) + _, ok := c.hashByHeight.Get(4) + assert.False(t, ok, "height 4 was not in snapshot") + _, ok = c.hashByHeight.Get(5) + assert.False(t, ok, "height 5 was not in snapshot") } -// TestCache_WithStorePersistence tests that DA inclusion is persisted to store -func TestCache_WithStorePersistence(t *testing.T) { +// TestCache_RestoreFromStore_NilStore verifies that RestoreFromStore is a +// no-op when the cache has no backing store. +func TestCache_RestoreFromStore_NilStore(t *testing.T) { + c := NewCache[testItem](nil, "") + require.NoError(t, c.RestoreFromStore(context.Background())) + assert.Equal(t, 0, c.daIncluded.Len()) +} + +// TestCache_RestoreFromStore_PlaceholderOverwrittenByRealHash +// when a real content-hash entry is written after restore it overwrites the +// height-indexed placeholder, leaving exactly one entry per height. +func TestCache_RestoreFromStore_PlaceholderOverwrittenByRealHash(t *testing.T) { st := testMemStore(t) ctx := context.Background() - c1 := NewCache[testItem](st, "test/") + // Snapshot contains one in-flight entry for height 3. + writeSnapshot(t, st, "hdr/", []snapshotEntry{ + {blockHeight: 3, daHeight: 99}, + }) - // Set DA inclusion - c1.setDAIncluded("hash1", 100, 1) - c1.setDAIncluded("hash2", 200, 2) + c := NewCache[testItem](st, "hdr/") + require.NoError(t, c.RestoreFromStore(ctx)) - err := c1.SaveToStore(ctx) - require.NoError(t, err) + assert.Equal(t, 1, c.daIncluded.Len(), "one placeholder for height 3") - // Create new cache with same store and restore - c2 := NewCache[testItem](st, "test/") + // Simulate the DA submitter writing the real hash entry. + c.setDAIncluded("realHash_height3", 99, 3) - err = c2.RestoreFromStore(ctx) - require.NoError(t, err) + // hashByHeight[3] now points to the new real hash. + newHash, ok := c.hashByHeight.Get(3) + require.True(t, ok) + assert.Equal(t, "realHash_height3", newHash) - // hash1 and hash2 should be restored, hash3 should not exist - daHeight, ok := c2.getDAIncluded("hash1") - assert.True(t, ok) - assert.Equal(t, uint64(100), daHeight) + // The real entry must be queryable by its content hash. + daH, ok := c.getDAIncluded("realHash_height3") + require.True(t, ok) + assert.Equal(t, uint64(99), daH) +} - daHeight, ok = c2.getDAIncluded("hash2") - assert.True(t, ok) - assert.Equal(t, uint64(200), daHeight) +// TestCache_RestoreFromStore_RoundTrip verifies that setDAIncluded persists a +// snapshot that a freshly-constructed cache can fully recover. +func TestCache_RestoreFromStore_RoundTrip(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - _, ok = c2.getDAIncluded("hash3") - assert.False(t, ok) + // First cache instance: write some in-flight entries. + c1 := NewCache[testItem](st, "rt/") + c1.setDAIncluded("hashA", 10, 1) + c1.setDAIncluded("hashB", 20, 2) + c1.setDAIncluded("hashC", 30, 3) + // Remove one entry to confirm deletions are also snapshotted. + c1.removeDAIncluded("hashB") + + // Second cache instance on same store: should recover {hashA→10, hashC→30}. + c2 := NewCache[testItem](st, "rt/") + require.NoError(t, c2.RestoreFromStore(ctx)) + + assert.Equal(t, 2, c2.daIncluded.Len(), "only non-deleted entries should be restored") + assert.Equal(t, uint64(30), c2.daHeight()) + + // Placeholder keys are created for heights 1 and 3 (height 2 was removed). + _, ok := c2.hashByHeight.Get(1) + assert.True(t, ok, "height 1 placeholder should exist") + _, ok = c2.hashByHeight.Get(2) + assert.False(t, ok, "height 2 was removed, should not exist") + _, ok = c2.hashByHeight.Get(3) + assert.True(t, ok, "height 3 placeholder should exist") } -// TestCache_LargeDataset covers edge cases with height index management at scale. -func TestCache_LargeDataset(t *testing.T) { - c := NewCache[testItem](nil, "") - const N = 20000 - // Insert in descending order to exercise insert positions - for i := N - 1; i >= 0; i-- { - v := &testItem{V: i} - c.setItem(uint64(i), v) - } - // Delete a range in the middle - for i := 5000; i < 10000; i += 2 { - c.getNextItem(uint64(i)) - } -} +// --------------------------------------------------------------------------- +// Basic operations (no store required) +// --------------------------------------------------------------------------- -// TestCache_BasicOperations tests basic cache operations func TestCache_BasicOperations(t *testing.T) { c := NewCache[testItem](nil, "") - // Test setItem/getItem - item := &testItem{V: 42} - c.setItem(1, item) + // setItem / getItem + c.setItem(1, &testItem{V: 42}) got := c.getItem(1) - assert.NotNil(t, got) + require.NotNil(t, got) assert.Equal(t, 42, got.V) + assert.Nil(t, c.getItem(999)) - // Test getItem for non-existent key - got = c.getItem(999) - assert.Nil(t, got) - - // Test setSeen/isSeen + // setSeen / isSeen / removeSeen assert.False(t, c.isSeen("hash1")) c.setSeen("hash1", 1) assert.True(t, c.isSeen("hash1")) - - // Test removeSeen c.removeSeen("hash1") assert.False(t, c.isSeen("hash1")) - // Test setDAIncluded/getDAIncluded + // setDAIncluded / getDAIncluded / removeDAIncluded _, ok := c.getDAIncluded("hash2") assert.False(t, ok) - c.setDAIncluded("hash2", 100, 2) daHeight, ok := c.getDAIncluded("hash2") assert.True(t, ok) assert.Equal(t, uint64(100), daHeight) - - // Test removeDAIncluded c.removeDAIncluded("hash2") _, ok = c.getDAIncluded("hash2") assert.False(t, ok) } -// TestCache_GetNextItem tests the atomic get-and-remove operation func TestCache_GetNextItem(t *testing.T) { c := NewCache[testItem](nil, "") - // Set multiple items c.setItem(1, &testItem{V: 1}) c.setItem(2, &testItem{V: 2}) c.setItem(3, &testItem{V: 3}) - // Get and remove item at height 2 got := c.getNextItem(2) - assert.NotNil(t, got) + require.NotNil(t, got) assert.Equal(t, 2, got.V) - // Item should be removed - got = c.getNextItem(2) - assert.Nil(t, got) - - // Other items should still exist - got = c.getItem(1) - assert.NotNil(t, got) - assert.Equal(t, 1, got.V) + // removed + assert.Nil(t, c.getNextItem(2)) - got = c.getItem(3) - assert.NotNil(t, got) - assert.Equal(t, 3, got.V) + // others intact + assert.NotNil(t, c.getItem(1)) + assert.NotNil(t, c.getItem(3)) } -// TestCache_DeleteAllForHeight tests deleting all data for a specific height func TestCache_DeleteAllForHeight(t *testing.T) { c := NewCache[testItem](nil, "") - // Set items at different heights c.setItem(1, &testItem{V: 1}) c.setItem(2, &testItem{V: 2}) c.setSeen("hash1", 1) c.setSeen("hash2", 2) - // Delete height 1 c.deleteAllForHeight(1) - // Height 1 data should be gone assert.Nil(t, c.getItem(1)) assert.False(t, c.isSeen("hash1")) - // Height 2 data should still exist assert.NotNil(t, c.getItem(2)) assert.True(t, c.isSeen("hash2")) } -// TestCache_WithNilStore tests creating cache with nil store func TestCache_WithNilStore(t *testing.T) { - // Cache without store should work fine c := NewCache[testItem](nil, "") require.NotNil(t, c) - // Basic operations should work c.setItem(1, &testItem{V: 1}) got := c.getItem(1) - assert.NotNil(t, got) + require.NotNil(t, got) assert.Equal(t, 1, got.V) - // DA inclusion should work (just not persisted) c.setDAIncluded("hash1", 100, 1) daHeight, ok := c.getDAIncluded("hash1") assert.True(t, ok) assert.Equal(t, uint64(100), daHeight) } -// TestCache_SaveToStore tests the SaveToStore method +// --------------------------------------------------------------------------- +// SaveToStore / ClearFromStore +// --------------------------------------------------------------------------- + func TestCache_SaveToStore(t *testing.T) { st := testMemStore(t) ctx := context.Background() c := NewCache[testItem](st, "save-test/") - - // Set some DA included entries c.setDAIncluded("hash1", 100, 1) c.setDAIncluded("hash2", 200, 2) - // Save to store (should be a no-op since we persist on setDAIncluded) - err := c.SaveToStore(ctx) + require.NoError(t, c.SaveToStore(ctx)) + + // SaveToStore rewrites the single snapshot key (storeKeyPrefix + "__snap"). + // Two entries × 16 bytes each = 32 bytes total. + raw, err := st.GetMetadata(ctx, "save-test/__snap") require.NoError(t, err) + assert.Len(t, raw, 2*snapshotEntrySize, "snapshot should contain 2 entries of 16 bytes each") - // Verify data is in store by creating new cache and restoring - c2 := NewCache[testItem](st, "save-test/") + // The individual per-hash keys are NOT written by the snapshot design. + _, err = st.GetMetadata(ctx, "save-test/hash1") + assert.Error(t, err, "per-hash keys should not exist in the snapshot design") +} - err = c2.RestoreFromStore(ctx) - require.NoError(t, err) +func TestCache_ClearFromStore(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - daHeight, ok := c2.getDAIncluded("hash1") - assert.True(t, ok) - assert.Equal(t, uint64(100), daHeight) + c := NewCache[testItem](st, "clear-test/") + c.setDAIncluded("hash1", 100, 1) + c.setDAIncluded("hash2", 200, 2) - daHeight, ok = c2.getDAIncluded("hash2") - assert.True(t, ok) - assert.Equal(t, uint64(200), daHeight) + require.NoError(t, c.ClearFromStore(ctx)) + + _, err := st.GetMetadata(ctx, "clear-test/hash1") + assert.Error(t, err, "key should have been removed from store") +} + +// --------------------------------------------------------------------------- +// Large-dataset smoke test +// --------------------------------------------------------------------------- + +func TestCache_LargeDataset(t *testing.T) { + c := NewCache[testItem](nil, "") + const N = 20_000 + for i := N - 1; i >= 0; i-- { + c.setItem(uint64(i), &testItem{V: i}) + } + for i := 5000; i < 10000; i += 2 { + c.getNextItem(uint64(i)) + } +} + +// --------------------------------------------------------------------------- +// heightPlaceholderKey +// --------------------------------------------------------------------------- + +// TestHeightPlaceholderKey verifies the placeholder key format and uniqueness. +func TestHeightPlaceholderKey(t *testing.T) { + k0 := HeightPlaceholderKey("pfx/", 0) + k1 := HeightPlaceholderKey("pfx/", 1) + kMax := HeightPlaceholderKey("pfx/", ^uint64(0)) + + assert.NotEqual(t, k0, k1) + assert.NotEqual(t, k1, kMax) + + // Must start with the provided prefix. + assert.Contains(t, k0, "pfx/") + assert.Contains(t, k1, "pfx/") + assert.Contains(t, kMax, "pfx/") + + // Different prefixes must not collide. + assert.NotEqual(t, HeightPlaceholderKey("a/", 1), HeightPlaceholderKey("b/", 1)) +} + +// TestCache_NoPlaceholderLeakAfterRefire verifies that when the DA retriever +// re-fires setDAIncluded with the real content hash after a restart, the +// snapshot placeholder that RestoreFromStore installed is evicted from +// daIncluded. Without the eviction in setDAIncluded, every restart cycle +// would leak one orphaned placeholder key per in-flight block. +func TestCache_NoPlaceholderLeakAfterRefire(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() + + // Step 1: initial run — write a real hash for height 3. + c1 := NewCache[testItem](st, "pfx/") + c1.setDAIncluded("realHash3", 99, 3) + // snapshot now contains [{blockHeight:3, daHeight:99}] + + // Step 2: restart — placeholder installed for height 3. + c2 := NewCache[testItem](st, "pfx/") + require.NoError(t, c2.RestoreFromStore(ctx)) + + placeholder := HeightPlaceholderKey("pfx/", 3) + _, placeholderPresent := c2.daIncluded.Get(placeholder) + require.True(t, placeholderPresent, "placeholder must be present immediately after restore") + assert.Equal(t, 1, c2.daIncluded.Len(), "only one entry expected before re-fire") + + // Step 3: DA retriever re-fires with the real hash. + c2.setDAIncluded("realHash3", 99, 3) + + // The real hash must be present. + daH, ok := c2.getDAIncluded("realHash3") + require.True(t, ok, "real hash must be present after re-fire") + assert.Equal(t, uint64(99), daH) + + // The placeholder must be gone — no orphan leak. + _, placeholderPresent = c2.daIncluded.Get(placeholder) + assert.False(t, placeholderPresent, "placeholder must be evicted after real hash is written") + + // Total entries must still be exactly one. + assert.Equal(t, 1, c2.daIncluded.Len(), "exactly one daIncluded entry after re-fire — no orphan") +} + +// TestCache_RestartIdempotent verifies that multiple successive restarts all +// yield a correctly functioning cache — i.e. the snapshot written after a +// re-fire is identical in semantics to the original, so a second (or third) +// restart still loads the right DA height via the placeholder fallback and the +// snapshot never grows stale or accumulates phantom entries. +func TestCache_RestartIdempotent(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() + + const realHash = "realHashH5" + const blockH = uint64(5) + const daH = uint64(42) + + // ── Run 1: normal operation, height 5 in-flight ────────────────────────── + c1 := NewCache[testItem](st, "pfx/") + c1.setDAIncluded(realHash, daH, blockH) + // snapshot: [{5, 42}] + + for restart := 1; restart <= 3; restart++ { + // ── Restart N: restore from snapshot + cR := NewCache[testItem](st, "pfx/") + require.NoError(t, cR.RestoreFromStore(ctx), "restart %d: RestoreFromStore", restart) + + assert.Equal(t, 1, cR.daIncluded.Len(), "restart %d: one placeholder entry", restart) + assert.Equal(t, daH, cR.daHeight(), "restart %d: daHeight correct", restart) + + // Fallback lookup by height must work. + gotDAH, ok := cR.getDAIncludedByHeight(blockH) + require.True(t, ok, "restart %d: height-based lookup must succeed", restart) + assert.Equal(t, daH, gotDAH, "restart %d: height-based DA height correct", restart) + + // ── DA retriever re-fires with the real hash + cR.setDAIncluded(realHash, daH, blockH) + + // After re-fire: real hash present, no orphan, snapshot updated. + _, realPresent := cR.daIncluded.Get(realHash) + assert.True(t, realPresent, "restart %d: real hash present after re-fire", restart) + assert.Equal(t, 1, cR.daIncluded.Len(), "restart %d: no orphan after re-fire", restart) + + // The snapshot rewritten by re-fire must still encode the right data + // so the next restart can load it correctly. + // (persistSnapshot fires inside setDAIncluded, so the store is up to date.) + } } diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 26b52491ac..bc53eeca8c 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -16,16 +16,18 @@ import ( ) const ( - // Store key prefixes for different cache types - headerDAIncludedPrefix = "cache/header-da-included/" - dataDAIncludedPrefix = "cache/data-da-included/" + // HeaderDAIncludedPrefix is the store key prefix for header DA inclusion tracking. + HeaderDAIncludedPrefix = "cache/header-da-included/" - // DefaultTxCacheRetention is the default time to keep transaction hashes in cache + // DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking. + DataDAIncludedPrefix = "cache/data-da-included/" + + // DefaultTxCacheRetention is the default time to keep transaction hashes in cache. DefaultTxCacheRetention = 24 * time.Hour ) // CacheManager provides thread-safe cache operations for tracking seen blocks -// and DA inclusion status during block execution and syncing. +// and DA inclusion status. type CacheManager interface { DaHeight() uint64 @@ -33,6 +35,10 @@ type CacheManager interface { IsHeaderSeen(hash string) bool SetHeaderSeen(hash string, blockHeight uint64) GetHeaderDAIncluded(hash string) (uint64, bool) + // GetHeaderDAIncludedByHeight looks up DA height via the height→hash index. + // Works for both real hashes (steady state) and snapshot placeholders + // (post-restart, before the DA retriever re-fires the real hash). + GetHeaderDAIncludedByHeight(blockHeight uint64) (uint64, bool) SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64) RemoveHeaderDAIncluded(hash string) @@ -40,6 +46,7 @@ type CacheManager interface { IsDataSeen(hash string) bool SetDataSeen(hash string, blockHeight uint64) GetDataDAIncluded(hash string) (uint64, bool) + GetDataDAIncludedByHeight(blockHeight uint64) (uint64, bool) SetDataDAIncluded(hash string, daHeight uint64, blockHeight uint64) RemoveDataDAIncluded(hash string) @@ -60,7 +67,7 @@ type CacheManager interface { DeleteHeight(blockHeight uint64) } -// PendingManager provides operations for managing pending headers and data +// PendingManager provides operations for managing pending headers and data. type PendingManager interface { GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) @@ -72,7 +79,7 @@ type PendingManager interface { NumPendingData() uint64 } -// Manager provides centralized cache management for both executing and syncing components +// Manager combines CacheManager and PendingManager. type Manager interface { CacheManager PendingManager @@ -80,12 +87,11 @@ type Manager interface { var _ Manager = (*implementation)(nil) -// implementation provides the concrete implementation of cache Manager type implementation struct { headerCache *Cache[types.SignedHeader] dataCache *Cache[types.Data] txCache *Cache[struct{}] - txTimestamps *sync.Map // map[string]time.Time - tracks when each tx was seen + txTimestamps *sync.Map // map[string]time.Time pendingEventsCache *Cache[common.DAHeightEvent] pendingHeaders *PendingHeaders pendingData *PendingData @@ -94,16 +100,13 @@ type implementation struct { logger zerolog.Logger } -// NewManager creates a new cache manager instance +// NewManager creates a new Manager, restoring or clearing persisted state as configured. func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { - // Initialize caches with store-based persistence for DA inclusion data - headerCache := NewCache[types.SignedHeader](st, headerDAIncludedPrefix) - dataCache := NewCache[types.Data](st, dataDAIncludedPrefix) - // TX cache and pending events cache don't need store persistence + headerCache := NewCache[types.SignedHeader](st, HeaderDAIncludedPrefix) + dataCache := NewCache[types.Data](st, DataDAIncludedPrefix) txCache := NewCache[struct{}](nil, "") pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") - // Initialize pending managers pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { return nil, fmt.Errorf("failed to create pending headers: %w", err) @@ -128,7 +131,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag } if cfg.ClearCache { - // Clear the cache from disk if err := impl.ClearFromStore(); err != nil { logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") } @@ -155,6 +157,10 @@ func (m *implementation) GetHeaderDAIncluded(hash string) (uint64, bool) { return m.headerCache.getDAIncluded(hash) } +func (m *implementation) GetHeaderDAIncludedByHeight(blockHeight uint64) (uint64, bool) { + return m.headerCache.getDAIncludedByHeight(blockHeight) +} + func (m *implementation) SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64) { m.headerCache.setDAIncluded(hash, daHeight, blockHeight) } @@ -163,12 +169,11 @@ func (m *implementation) RemoveHeaderDAIncluded(hash string) { m.headerCache.removeDAIncluded(hash) } -// DaHeight fetches the heights da height contained in the processed cache. +// DaHeight returns the highest DA height seen across header and data caches. func (m *implementation) DaHeight() uint64 { return max(m.headerCache.daHeight(), m.dataCache.daHeight()) } -// Data operations func (m *implementation) IsDataSeen(hash string) bool { return m.dataCache.isSeen(hash) } @@ -181,6 +186,10 @@ func (m *implementation) GetDataDAIncluded(hash string) (uint64, bool) { return m.dataCache.getDAIncluded(hash) } +func (m *implementation) GetDataDAIncludedByHeight(blockHeight uint64) (uint64, bool) { + return m.dataCache.getDAIncludedByHeight(blockHeight) +} + func (m *implementation) SetDataDAIncluded(hash string, daHeight uint64, blockHeight uint64) { m.dataCache.setDAIncluded(hash, daHeight, blockHeight) } @@ -189,7 +198,6 @@ func (m *implementation) RemoveDataDAIncluded(hash string) { m.dataCache.removeDAIncluded(hash) } -// Transaction operations func (m *implementation) IsTxSeen(hash string) bool { return m.txCache.isSeen(hash) } @@ -201,9 +209,8 @@ func (m *implementation) SetTxSeen(hash string) { m.txTimestamps.Store(hash, time.Now()) } -// CleanupOldTxs removes transaction hashes older than the specified duration. -// Returns the number of transactions removed. -// This prevents unbounded growth of the transaction cache. +// CleanupOldTxs removes transaction hashes older than olderThan and returns +// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0. func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { if olderThan <= 0 { olderThan = DefaultTxCacheRetention @@ -217,14 +224,11 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { if !ok { return true } - timestamp, ok := value.(time.Time) if !ok { return true } - if timestamp.Before(cutoff) { - // Remove from both caches m.txCache.removeSeen(hash) m.txTimestamps.Delete(hash) removed++ @@ -273,7 +277,7 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat marshalledSignedData := make([][]byte, 0, len(dataList)) for i, data := range dataList { if len(data.Txs) == 0 { - continue // Skip empty data + continue } // Note: Actual signing needs to be done by the executing component // as it has access to the signer. This method returns unsigned data @@ -323,16 +327,13 @@ func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEven return m.pendingEventsCache.getNextItem(height) } -// SaveToStore persists the DA inclusion cache to the store. -// DA inclusion data is persisted on every SetHeaderDAIncluded/SetDataDAIncluded call, -// so this method ensures any remaining data is flushed. +// SaveToStore flushes the DA inclusion snapshot to the store. +// Mutations already persist on every call, so this is mainly a final flush. func (m *implementation) SaveToStore() error { ctx := context.Background() - if err := m.headerCache.SaveToStore(ctx); err != nil { return fmt.Errorf("failed to save header cache to store: %w", err) } - if err := m.dataCache.SaveToStore(ctx); err != nil { return fmt.Errorf("failed to save data cache to store: %w", err) } @@ -341,17 +342,15 @@ func (m *implementation) SaveToStore() error { return nil } -// RestoreFromStore restores the DA inclusion cache from the store. -// This uses prefix-based queries to directly load persisted DA inclusion data, -// avoiding expensive iteration through all blocks. +// RestoreFromStore loads the in-flight snapshot (O(1)) then seeds maxDAHeight +// from the finalized-tip HeightToDAHeight metadata so DaHeight() is correct +// even when the snapshot is empty (all blocks finalized). func (m *implementation) RestoreFromStore() error { ctx := context.Background() - // Restore DA inclusion data from store if err := m.headerCache.RestoreFromStore(ctx); err != nil { return fmt.Errorf("failed to restore header cache from store: %w", err) } - if err := m.dataCache.RestoreFromStore(ctx); err != nil { return fmt.Errorf("failed to restore data cache from store: %w", err) } @@ -368,24 +367,20 @@ func (m *implementation) RestoreFromStore() error { return nil } -// ClearFromStore clears in-memory caches and deletes DA inclusion entries from the store. +// ClearFromStore wipes the snapshot keys and rebuilds empty in-memory caches, +// then seeds maxDAHeight from persisted metadata so DaHeight() stays correct. func (m *implementation) ClearFromStore() error { ctx := context.Background() - // Get hashes from current in-memory caches and delete from store - headerHashes := m.headerCache.daIncluded.Keys() - if err := m.headerCache.ClearFromStore(ctx, headerHashes); err != nil { + if err := m.headerCache.ClearFromStore(ctx); err != nil { return fmt.Errorf("failed to clear header cache from store: %w", err) } - - dataHashes := m.dataCache.daIncluded.Keys() - if err := m.dataCache.ClearFromStore(ctx, dataHashes); err != nil { + if err := m.dataCache.ClearFromStore(ctx); err != nil { return fmt.Errorf("failed to clear data cache from store: %w", err) } - // Clear in-memory caches by creating new ones - m.headerCache = NewCache[types.SignedHeader](m.store, headerDAIncludedPrefix) - m.dataCache = NewCache[types.Data](m.store, dataDAIncludedPrefix) + m.headerCache = NewCache[types.SignedHeader](m.store, HeaderDAIncludedPrefix) + m.dataCache = NewCache[types.Data](m.store, DataDAIncludedPrefix) m.txCache = NewCache[struct{}](nil, "") m.pendingEventsCache = NewCache[common.DAHeightEvent](nil, "") @@ -395,30 +390,23 @@ func (m *implementation) ClearFromStore() error { return nil } -// initDAHeightFromStore initializes the maxDAHeight in both header and data caches -// from the HeightToDAHeight store metadata (final da inclusion tracking). +// initDAHeightFromStore seeds maxDAHeight from the HeightToDAHeight metadata +// written by the submitter for the last finalized block. This ensures +// DaHeight() is non-zero on restart even when the in-flight snapshot is empty. func (m *implementation) initDAHeightFromStore(ctx context.Context) { - // Get the DA included height from store (last processed block height) - daIncludedHeightBytes, err := m.store.GetMetadata(ctx, store.DAIncludedHeightKey) - if err != nil || len(daIncludedHeightBytes) != 8 { + daIncludedBytes, err := m.store.GetMetadata(ctx, store.DAIncludedHeightKey) + if err != nil || len(daIncludedBytes) != 8 { return } - daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes) + daIncludedHeight := binary.LittleEndian.Uint64(daIncludedBytes) if daIncludedHeight == 0 { return } - // Get header DA height for the last included height - headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight) - if headerBytes, err := m.store.GetMetadata(ctx, headerKey); err == nil && len(headerBytes) == 8 { - headerDAHeight := binary.LittleEndian.Uint64(headerBytes) - m.headerCache.setMaxDAHeight(headerDAHeight) + if b, err := m.store.GetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(daIncludedHeight)); err == nil && len(b) == 8 { + m.headerCache.setMaxDAHeight(binary.LittleEndian.Uint64(b)) } - - // Get data DA height for the last included height - dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight) - if dataBytes, err := m.store.GetMetadata(ctx, dataKey); err == nil && len(dataBytes) == 8 { - dataDAHeight := binary.LittleEndian.Uint64(dataBytes) - m.dataCache.setMaxDAHeight(dataDAHeight) + if b, err := m.store.GetMetadata(ctx, store.GetHeightToDAHeightDataKey(daIncludedHeight)); err == nil && len(b) == 8 { + m.dataCache.setMaxDAHeight(binary.LittleEndian.Uint64(b)) } } diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 6963c9eb08..fdfde87a18 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -12,7 +12,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/store" + pkgstore "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) @@ -24,10 +24,10 @@ func tempConfig(t *testing.T) config.Config { } // helper to make an in-memory store -func memStore(t *testing.T) store.Store { - ds, err := store.NewTestInMemoryKVStore() +func memStore(t *testing.T) pkgstore.Store { + ds, err := pkgstore.NewTestInMemoryKVStore() require.NoError(t, err) - return store.New(ds) + return pkgstore.New(ds) } func TestManager_HeaderDataOperations(t *testing.T) { @@ -92,10 +92,10 @@ func TestManager_SaveAndRestoreFromStore(t *testing.T) { st := memStore(t) ctx := context.Background() - // First, we need to save some block data to the store so RestoreFromStore can find the hashes h1, d1 := types.GetRandomBlock(1, 1, "test-chain") h2, d2 := types.GetRandomBlock(2, 1, "test-chain") + // Write blocks to the store so store.Height() returns 2. batch1, err := st.NewBatch(ctx) require.NoError(t, err) require.NoError(t, batch1.SaveBlockData(h1, d1, &types.Signature{})) @@ -111,30 +111,53 @@ func TestManager_SaveAndRestoreFromStore(t *testing.T) { m1, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // Set DA inclusion for the blocks + // Simulate the submitter: write per-height DA mappings (normally done by + // setNodeHeightToDAHeight) and mark height 1 as finalized. + // Heights 1 and 2 are both submitted to DA; height 1 is finalized, height 2 is in-flight. + writeHeightDAMeta := func(height, daH uint64, _, _ string) { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, daH) + require.NoError(t, st.SetMetadata(ctx, pkgstore.GetHeightToDAHeightHeaderKey(height), b)) + binary.LittleEndian.PutUint64(b, daH) + require.NoError(t, st.SetMetadata(ctx, pkgstore.GetHeightToDAHeightDataKey(height), b)) + } + writeHeightDAMeta(1, 100, "", "") + writeHeightDAMeta(2, 101, "", "") + + // Persist DAIncludedHeight = 1 (height 2 is still in-flight). + daIncludedBz := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncludedBz, 1) + require.NoError(t, st.SetMetadata(ctx, pkgstore.DAIncludedHeightKey, daIncludedBz)) + + // Also write real content-hash entries for both heights so that + // SetHeaderDAIncluded / SetDataDAIncluded paths are tested. m1.SetHeaderDAIncluded(h1.Hash().String(), 100, 1) m1.SetDataDAIncluded(d1.DACommitment().String(), 100, 1) m1.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) m1.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) - // Persist to store - err = m1.SaveToStore() - require.NoError(t, err) + require.NoError(t, m1.SaveToStore()) // Create a fresh manager on same store and verify restore m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // Check DA inclusion was restored - daHeight, ok := m2.GetHeaderDAIncluded(h1.Hash().String()) - assert.True(t, ok) - assert.Equal(t, uint64(100), daHeight) + // Height 1 is finalized (DAIncludedHeight = 1): IsHeightDAIncluded returns true + // via the height comparison, so GetHeaderDAIncluded is never consulted for it. + // The cache entry is not restored — this is correct and intentional. - daHeight, ok = m2.GetDataDAIncluded(d1.DACommitment().String()) - assert.True(t, ok) - assert.Equal(t, uint64(100), daHeight) + // Height 2 is in-flight: the window restore loads a placeholder entry keyed by + // height. The real content-hash entry is populated when the submitter re-processes + // the block after restart. Until then, DaHeight() must reflect the in-flight DA height. + assert.Equal(t, uint64(101), m2.DaHeight(), + "DaHeight should reflect the highest in-flight DA height after restore") - daHeight, ok = m2.GetHeaderDAIncluded(h2.Hash().String()) + // After the submitter re-fires SetHeaderDAIncluded for height 2, the real hash + // entry must be queryable. + m2.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) + m2.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) + + daHeight, ok := m2.GetHeaderDAIncluded(h2.Hash().String()) assert.True(t, ok) assert.Equal(t, uint64(101), daHeight) @@ -431,14 +454,23 @@ func TestManager_DAInclusionPersistence(t *testing.T) { // Verify DA height is tracked assert.Equal(t, uint64(101), m1.DaHeight()) - err = m1.SaveToStore() - require.NoError(t, err) + require.NoError(t, m1.SaveToStore()) - // Create new manager - DA inclusion should be restored + // SaveToStore writes a compact snapshot key for each cache. + // A freshly created manager on the same store must recover the snapshot via + // a single GetMetadata call (O(1) restore). m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // DA inclusion should be restored from store + // The snapshot encodes the in-flight entries that were set above, so + // maxDAHeight must be restored to 101 (the higher of 100 and 101). + assert.Equal(t, uint64(101), m2.DaHeight(), + "maxDAHeight should be restored from the snapshot") + + // Simulate the submitter re-firing the real entries after restart. + m2.SetHeaderDAIncluded(headerHash, 100, 1) + m2.SetDataDAIncluded(dataHash, 101, 1) + daHeight, ok := m2.GetHeaderDAIncluded(headerHash) assert.True(t, ok) assert.Equal(t, uint64(100), daHeight) @@ -446,9 +478,6 @@ func TestManager_DAInclusionPersistence(t *testing.T) { daHeight, ok = m2.GetDataDAIncluded(dataHash) assert.True(t, ok) assert.Equal(t, uint64(101), daHeight) - - // Max DA height should also be restored - assert.Equal(t, uint64(101), m2.DaHeight()) } func TestManager_DaHeightAfterCacheClear(t *testing.T) { @@ -466,27 +495,34 @@ func TestManager_DaHeightAfterCacheClear(t *testing.T) { require.NoError(t, batch.SetHeight(1)) require.NoError(t, batch.Commit()) - // Set up the HeightToDAHeight metadata (simulating what submitter does) - headerDAHeightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(headerDAHeightBytes, 150) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerDAHeightBytes)) - - dataDAHeightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(dataDAHeightBytes, 155) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataDAHeightBytes)) - - // Set DAIncludedHeightKey to indicate height 1 was DA included - daIncludedBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(daIncludedBytes, 1) - require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, daIncludedBytes)) - - // Create manager with ClearCache = true + // Write the finalized-tip metadata exactly as setNodeHeightToDAHeight does + // in production. initDAHeightFromStore reads these keys to seed DaHeight() + // after ClearCache (the snapshot is wiped, but these keys survive). + headerDABz := make([]byte, 8) + binary.LittleEndian.PutUint64(headerDABz, 150) + require.NoError(t, st.SetMetadata(ctx, pkgstore.GetHeightToDAHeightHeaderKey(1), headerDABz)) + + dataDABz := make([]byte, 8) + binary.LittleEndian.PutUint64(dataDABz, 155) + require.NoError(t, st.SetMetadata(ctx, pkgstore.GetHeightToDAHeightDataKey(1), dataDABz)) + + daIncBz := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncBz, 1) + require.NoError(t, st.SetMetadata(ctx, pkgstore.DAIncludedHeightKey, daIncBz)) + + // Create manager with ClearCache = true. + // ClearFromStore deletes the snapshot key, but initDAHeightFromStore still + // reads the persisted finalized-tip HeightToDAHeight metadata, so DaHeight() + // is seeded correctly even after the in-memory caches are wiped. cfg.ClearCache = true m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // DaHeight should NOT be 0 - it should be initialized from store metadata - assert.Equal(t, uint64(155), m.DaHeight(), "DaHeight should be initialized from HeightToDAHeight metadata even after cache clear") + // DaHeight must reflect the finalized-tip DA height loaded from store + // metadata, not 0. The syncer uses this to seed daRetrieverHeight so the + // node does not re-scan DA from genesis after an operator-triggered clear. + assert.Equal(t, uint64(155), m.DaHeight(), + "DaHeight should be seeded from finalized-tip metadata even after ClearCache") } func TestManager_DaHeightFromStoreOnRestore(t *testing.T) { @@ -504,25 +540,32 @@ func TestManager_DaHeightFromStoreOnRestore(t *testing.T) { require.NoError(t, batch.SetHeight(1)) require.NoError(t, batch.Commit()) - // Set up HeightToDAHeight metadata but NOT the cache entries - // This simulates a scenario where DA inclusion was processed but cache entries were lost - headerDAHeightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(headerDAHeightBytes, 200) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerDAHeightBytes)) - - dataDAHeightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(dataDAHeightBytes, 205) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataDAHeightBytes)) - - daIncludedBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(daIncludedBytes, 1) - require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, daIncludedBytes)) - - // Create manager without ClearCache - should restore and init from metadata + // Persist the finalized-tip HeightToDAHeight metadata exactly as + // setNodeHeightToDAHeight does in production. These keys are the source + // of truth that initDAHeightFromStore reads — they exist independently of + // the snapshot and survive across restarts and cache clears. + headerDABz := make([]byte, 8) + binary.LittleEndian.PutUint64(headerDABz, 200) + require.NoError(t, st.SetMetadata(ctx, pkgstore.GetHeightToDAHeightHeaderKey(1), headerDABz)) + + dataDABz := make([]byte, 8) + binary.LittleEndian.PutUint64(dataDABz, 205) + require.NoError(t, st.SetMetadata(ctx, pkgstore.GetHeightToDAHeightDataKey(1), dataDABz)) + + daIncBz := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncBz, 1) + require.NoError(t, st.SetMetadata(ctx, pkgstore.DAIncludedHeightKey, daIncBz)) + + // Create a manager — RestoreFromStore first reads the snapshot (O(1) for + // in-flight entries, empty here because height 1 is fully finalized), then + // initDAHeightFromStore seeds maxDAHeight from the finalized-tip metadata. cfg.ClearCache = false m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // DaHeight should be the max from HeightToDAHeight metadata - assert.Equal(t, uint64(205), m.DaHeight(), "DaHeight should be initialized from HeightToDAHeight metadata on restore") + // DaHeight must reflect the highest DA height from the finalized-tip + // metadata, not 0. Without initDAHeightFromStore this would be 0 because + // there are no in-flight snapshot entries. + assert.Equal(t, uint64(205), m.DaHeight(), + "DaHeight should be seeded from finalized-tip HeightToDAHeight metadata on restore") } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 1c5b034c19..a675a6dcb8 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -426,20 +426,20 @@ func (s *Submitter) sendCriticalError(err error) { } } -// setNodeHeightToDAHeight stores the mapping from a ev-node block height to the corresponding -// DA (Data Availability) layer heights where the block's header and data were included. -// This mapping is persisted in the store metadata and is used to track which DA heights -// contain the block components for a given ev-node height. -// -// For blocks with empty transactions, both header and data use the same DA height since -// empty transaction data is not actually published to the DA layer. +// setNodeHeightToDAHeight persists the DA heights for a block's header and data. +// For empty-tx blocks, both use the header DA height since no data blob is posted. func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, header *types.SignedHeader, data *types.Data, genesisInclusion bool) error { headerHash, dataHash := header.Hash(), data.DACommitment() headerDaHeightBytes := make([]byte, 8) + // Try real hash first; fall back to height-based lookup for post-restart + // placeholders (before the DA retriever has re-fired the real hash). daHeightForHeader, ok := s.cache.GetHeaderDAIncluded(headerHash.String()) if !ok { - return fmt.Errorf("header hash %s not found in cache", headerHash) + daHeightForHeader, ok = s.cache.GetHeaderDAIncludedByHeight(height) + } + if !ok { + return fmt.Errorf("header hash %s not found in cache for height %d", headerHash, height) } binary.LittleEndian.PutUint64(headerDaHeightBytes, daHeightForHeader) @@ -455,7 +455,10 @@ func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, } else { daHeightForData, ok := s.cache.GetDataDAIncluded(dataHash.String()) if !ok { - return fmt.Errorf("data hash %s not found in cache", dataHash.String()) + daHeightForData, ok = s.cache.GetDataDAIncludedByHeight(height) + } + if !ok { + return fmt.Errorf("data hash %s not found in cache for height %d", dataHash.String(), height) } binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForData) @@ -474,7 +477,6 @@ func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, return err } - // the sequencer will process DA epochs from this height. if s.sequencer != nil { s.sequencer.SetDAHeight(genesisDAIncludedHeight) s.logger.Debug().Uint64("genesis_da_height", genesisDAIncludedHeight).Msg("initialized sequencer DA height from persisted genesis DA height") @@ -484,10 +486,9 @@ func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, return nil } -// IsHeightDAIncluded checks if a height is included in DA +// IsHeightDAIncluded reports whether the block at height has been DA-included. func (s *Submitter) IsHeightDAIncluded(height uint64, header *types.SignedHeader, data *types.Data) (bool, error) { - // If height is at or below the DA included height, it was already processed - // and cache entries were cleared. We know it's DA included. + // Already finalized — cache entries were cleared, but we know it's included. if height <= s.GetDAIncludedHeight() { return true, nil } @@ -503,9 +504,18 @@ func (s *Submitter) IsHeightDAIncluded(height uint64, header *types.SignedHeader dataCommitment := data.DACommitment() + // Try real hash first; fall back to height-based lookup for post-restart + // state before the DA retriever has re-fired the real hashes. _, headerIncluded := s.cache.GetHeaderDAIncluded(header.Hash().String()) _, dataIncluded := s.cache.GetDataDAIncluded(dataCommitment.String()) + if !headerIncluded { + _, headerIncluded = s.cache.GetHeaderDAIncludedByHeight(height) + } + if !dataIncluded { + _, dataIncluded = s.cache.GetDataDAIncludedByHeight(height) + } + dataIncluded = bytes.Equal(dataCommitment, common.DataHashForEmptyTxs) || dataIncluded return headerIncluded && dataIncluded, nil diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index b41239da0f..1d607320ae 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -557,3 +557,205 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { assert.False(t, h3DAIncluded, "height 3 header should not have DA inclusion status") assert.False(t, d3DAIncluded, "height 3 data should not have DA inclusion status") } + +// TestSubmitter_IsHeightDAIncluded_AfterRestart proves that IsHeightDAIncluded +// returns true for in-flight blocks immediately after a restart, before the DA +// retriever has had a chance to re-fire SetHeaderDAIncluded with the real +// content hash. +// +// Scenario: +// 1. Node runs normally: heights 1–3 are DA-included, height 3 is in-flight +// (submitted to DA but not yet finalized). SetHeaderDAIncluded writes both +// the real-hash entry AND the snapshot key. +// 2. Node restarts: a fresh Manager is constructed on the same store. +// RestoreFromStore reads the snapshot and installs placeholder entries +// keyed by height (not by content hash). +// 3. processDAInclusionLoop calls IsHeightDAIncluded(3, h3, d3) BEFORE the +// DA retriever has re-fired SetHeaderDAIncluded("realHash3", …). +// Without the height-based fallback this would return false and stall. +// 4. With the fallback it finds the placeholder, returns true, and the loop +// can advance. +func TestSubmitter_IsHeightDAIncluded_AfterRestart(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // ── Step 1: pre-restart state ───────────────────────────────────────────── + // Build a store with three blocks and a cache that has height 3 in-flight. + ds1 := dssync.MutexWrap(datastore.NewMapDatastore()) + st1 := store.New(ds1) + cm1, err := cache.NewManager(config.DefaultConfig(), st1, zerolog.Nop()) + require.NoError(t, err) + + h1, d1 := newHeaderAndData("chain", 1, true) + h2, d2 := newHeaderAndData("chain", 2, true) + h3, d3 := newHeaderAndData("chain", 3, true) + + sig := types.Signature([]byte("sig")) + for _, blk := range []struct { + h *types.SignedHeader + d *types.Data + hgt uint64 + }{ + {h1, d1, 1}, + {h2, d2, 2}, + {h3, d3, 3}, + } { + batch, err := st1.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(blk.h, blk.d, &sig)) + require.NoError(t, batch.SetHeight(blk.hgt)) + require.NoError(t, batch.Commit()) + } + + // Heights 1 and 2 are fully finalized; height 3 is in-flight. + cm1.SetHeaderDAIncluded(h1.Hash().String(), 10, 1) + cm1.SetDataDAIncluded(d1.DACommitment().String(), 10, 1) + cm1.SetHeaderDAIncluded(h2.Hash().String(), 11, 2) + cm1.SetDataDAIncluded(d2.DACommitment().String(), 11, 2) + cm1.SetHeaderDAIncluded(h3.Hash().String(), 12, 3) // in-flight + cm1.SetDataDAIncluded(d3.DACommitment().String(), 12, 3) + + // Persist the snapshot (already done by setDAIncluded on every mutation, + // but call SaveToStore explicitly to be clear about what survives restart). + require.NoError(t, cm1.SaveToStore()) + + // Persist DAIncludedHeight = 2 (heights 1 & 2 finalized, 3 is in-flight). + daIncBz := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncBz, 2) + require.NoError(t, st1.SetMetadata(ctx, store.DAIncludedHeightKey, daIncBz)) + + // ── Step 2: simulate restart ────────────────────────────────────────────── + // Build a fresh Manager on the SAME underlying datastore. This exercises + // the RestoreFromStore → snapshot-decode path. The DA retriever has NOT + // yet re-fired SetHeaderDAIncluded with the real hashes. + cm2, err := cache.NewManager(config.DefaultConfig(), st1, zerolog.Nop()) + require.NoError(t, err) + + daIncludedHeight := &atomic.Uint64{} + daIncludedHeight.Store(2) // matches what was persisted above + + s := &Submitter{ + store: st1, + cache: cm2, + logger: zerolog.Nop(), + daIncludedHeight: daIncludedHeight, + ctx: ctx, + } + + // ── Step 3: check IsHeightDAIncluded BEFORE DA retriever re-fires ───────── + // Height 3 is in-flight: above daIncludedHeight (2) so we can't short-circuit. + // The real hashes are NOT in cm2 yet — only the snapshot placeholders are. + _, realHeaderFound := cm2.GetHeaderDAIncluded(h3.Hash().String()) + assert.False(t, realHeaderFound, "real hash must not be present before DA retriever re-fires") + + included, err := s.IsHeightDAIncluded(3, h3, d3) + require.NoError(t, err) + assert.True(t, included, + "IsHeightDAIncluded must return true for in-flight height using snapshot placeholder, "+ + "before DA retriever re-fires SetHeaderDAIncluded") + + // ── Step 4: after DA retriever re-fires, real hash lookup also works ────── + cm2.SetHeaderDAIncluded(h3.Hash().String(), 12, 3) + cm2.SetDataDAIncluded(d3.DACommitment().String(), 12, 3) + + _, realHeaderFound = cm2.GetHeaderDAIncluded(h3.Hash().String()) + assert.True(t, realHeaderFound, "real hash must be present after DA retriever re-fires") + + included, err = s.IsHeightDAIncluded(3, h3, d3) + require.NoError(t, err) + assert.True(t, included, "IsHeightDAIncluded must still return true after real hash is written") +} + +// TestSubmitter_setNodeHeightToDAHeight_AfterRestart proves that +// setNodeHeightToDAHeight succeeds for an in-flight block immediately after a +// restart, before the DA retriever has re-fired SetHeaderDAIncluded with the +// real content hash. +// +// The bug this guards against: +// - Snapshot encodes [blockHeight → daHeight] pairs, not real content hashes. +// - On restart, RestoreFromStore installs placeholder keys (indexed by height). +// - setNodeHeightToDAHeight calls GetHeaderDAIncluded(realHash) which MISSES. +// - Without the height-based fallback it returns an error and processDAInclusionLoop +// logs the error and stalls — the submitter can never write HeightToDAHeight +// metadata and DAIncludedHeight never advances. +// - With GetHeaderDAIncludedByHeight / GetDataDAIncludedByHeight the lookup +// succeeds via the placeholder and the metadata is written correctly. +func TestSubmitter_setNodeHeightToDAHeight_AfterRestart(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // ── Step 1: pre-restart — write snapshot with height 3 in-flight ───────── + ds1 := dssync.MutexWrap(datastore.NewMapDatastore()) + st1 := store.New(ds1) + cm1, err := cache.NewManager(config.DefaultConfig(), st1, zerolog.Nop()) + require.NoError(t, err) + + h3, d3 := newHeaderAndData("chain", 3, true) + + sig := types.Signature([]byte("sig")) + batch, err := st1.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h3, d3, &sig)) + require.NoError(t, batch.SetHeight(3)) + require.NoError(t, batch.Commit()) + + // Mark height 3 as DA-included in the pre-restart cache. + // persistSnapshot fires here and writes the snapshot key to st1. + cm1.SetHeaderDAIncluded(h3.Hash().String(), 12, 3) + cm1.SetDataDAIncluded(d3.DACommitment().String(), 12, 3) + + // Persist DAIncludedHeight = 2 (height 3 is in-flight, not yet finalized). + daIncBz := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncBz, 2) + require.NoError(t, st1.SetMetadata(ctx, store.DAIncludedHeightKey, daIncBz)) + + // ── Step 2: simulate restart — fresh Manager, no DA retriever re-fire ───── + cm2, err := cache.NewManager(config.DefaultConfig(), st1, zerolog.Nop()) + require.NoError(t, err) + + // Confirm the real content hashes are NOT present after restore. + _, realHdrFound := cm2.GetHeaderDAIncluded(h3.Hash().String()) + _, realDataFound := cm2.GetDataDAIncluded(d3.DACommitment().String()) + require.False(t, realHdrFound, "real header hash must not be in cache before DA retriever re-fires") + require.False(t, realDataFound, "real data hash must not be in cache before DA retriever re-fires") + + daIncludedHeight := &atomic.Uint64{} + daIncludedHeight.Store(2) + + s := &Submitter{ + store: st1, + cache: cm2, + logger: zerolog.Nop(), + daIncludedHeight: daIncludedHeight, + ctx: ctx, + } + + // ── Step 3: call setNodeHeightToDAHeight — must succeed via fallback ────── + // Before this fix, GetHeaderDAIncluded(realHash) would miss and the function + // would return an error, stalling processDAInclusionLoop. + err = s.setNodeHeightToDAHeight(ctx, 3, h3, d3, false) + require.NoError(t, err, + "setNodeHeightToDAHeight must succeed via height-based fallback before DA retriever re-fires") + + // ── Step 4: verify the HeightToDAHeight metadata was written correctly ───── + headerDABz, err := st1.GetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(3)) + require.NoError(t, err) + require.Len(t, headerDABz, 8) + assert.Equal(t, uint64(12), binary.LittleEndian.Uint64(headerDABz), + "header HeightToDAHeight metadata must reflect the DA height from the snapshot") + + dataDABz, err := st1.GetMetadata(ctx, store.GetHeightToDAHeightDataKey(3)) + require.NoError(t, err) + require.Len(t, dataDABz, 8) + assert.Equal(t, uint64(12), binary.LittleEndian.Uint64(dataDABz), + "data HeightToDAHeight metadata must reflect the DA height from the snapshot") + + // ── Step 5: after DA retriever re-fires, the real-hash path still works ─── + cm2.SetHeaderDAIncluded(h3.Hash().String(), 12, 3) + cm2.SetDataDAIncluded(d3.DACommitment().String(), 12, 3) + + err = s.setNodeHeightToDAHeight(ctx, 3, h3, d3, false) + require.NoError(t, err, "setNodeHeightToDAHeight must still work once real hashes are populated") +} diff --git a/node/full.go b/node/full.go index 1ae9a62962..01d5e86284 100644 --- a/node/full.go +++ b/node/full.go @@ -104,12 +104,12 @@ func newFullNode( } leaderFactory := func() (raftpkg.Runnable, error) { - logger.Info().Msg("Starting aggregator-MODE") + logger.Info().Msg("Starting aggregator node") nodeConfig.Node.Aggregator = true return newAggregatorMode(nodeConfig, signer, genesis, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } followerFactory := func() (raftpkg.Runnable, error) { - logger.Info().Msg("Starting sync-MODE") + logger.Info().Msg("Starting syncing node") nodeConfig.Node.Aggregator = false return newSyncMode(nodeConfig, genesis, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } diff --git a/pkg/store/store.go b/pkg/store/store.go index b042149047..975db4e163 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -6,10 +6,8 @@ import ( "encoding/binary" "errors" "fmt" - "strings" ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" "google.golang.org/protobuf/proto" "github.com/evstack/ev-node/types" @@ -201,40 +199,6 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err return data, nil } -// GetMetadataByPrefix returns all metadata entries whose keys have the given prefix. -// This is more efficient than iterating through known keys when the set of keys is unknown. -func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { - // The full key in the datastore includes the meta prefix - fullPrefix := GetMetaKey(prefix) - - results, err := s.db.Query(ctx, dsq.Query{Prefix: fullPrefix}) - if err != nil { - return nil, fmt.Errorf("failed to query metadata with prefix '%s': %w", prefix, err) - } - defer results.Close() - - var entries []MetadataEntry - for result := range results.Next() { - if result.Error != nil { - return nil, fmt.Errorf("error iterating metadata results: %w", result.Error) - } - - // Extract the original key by removing the meta prefix - // The key from datastore is like "/m/cache/header-da-included/hash" - // We want to return "cache/header-da-included/hash" - metaKeyPrefix := GetMetaKey("") - key := strings.TrimPrefix(result.Key, metaKeyPrefix) - key = strings.TrimPrefix(key, "/") // Remove leading slash for consistency - - entries = append(entries, MetadataEntry{ - Key: key, - Value: result.Value, - }) - } - - return entries, nil -} - // DeleteMetadata removes a metadata key from the store. func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error { err := s.db.Delete(ctx, ds.NewKey(GetMetaKey(key))) diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index bd5e952f45..68903e1c7c 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -338,61 +338,6 @@ func TestMetadata(t *testing.T) { require.Nil(v) } -func TestGetMetadataByPrefix(t *testing.T) { - t.Parallel() - require := require.New(t) - - kv, err := NewTestInMemoryKVStore() - require.NoError(err) - s := New(kv) - - // Set metadata with a common prefix - prefix := "cache/test-prefix/" - require.NoError(s.SetMetadata(t.Context(), prefix+"hash1", []byte("value1"))) - require.NoError(s.SetMetadata(t.Context(), prefix+"hash2", []byte("value2"))) - require.NoError(s.SetMetadata(t.Context(), prefix+"hash3", []byte("value3"))) - // Set one with different prefix - require.NoError(s.SetMetadata(t.Context(), "other/prefix/key", []byte("other"))) - - // Query by prefix - entries, err := s.GetMetadataByPrefix(t.Context(), prefix) - require.NoError(err) - require.Len(entries, 3) - - // Verify keys have consistent format (no leading slash) - keyMap := make(map[string][]byte) - for _, entry := range entries { - // Key should start with the prefix, not "/prefix" - require.True(len(entry.Key) > 0, "key should not be empty") - require.NotEqual("/", string(entry.Key[0]), "key should not have leading slash: %s", entry.Key) - require.Contains(entry.Key, prefix, "key should contain prefix") - keyMap[entry.Key] = entry.Value - } - - // Verify all expected entries are present - require.Equal([]byte("value1"), keyMap[prefix+"hash1"]) - require.Equal([]byte("value2"), keyMap[prefix+"hash2"]) - require.Equal([]byte("value3"), keyMap[prefix+"hash3"]) - - // Other prefix should not be included - _, hasOther := keyMap["other/prefix/key"] - require.False(hasOther) -} - -func TestGetMetadataByPrefix_EmptyResult(t *testing.T) { - t.Parallel() - require := require.New(t) - - kv, err := NewTestInMemoryKVStore() - require.NoError(err) - s := New(kv) - - // Query non-existent prefix - entries, err := s.GetMetadataByPrefix(t.Context(), "nonexistent/prefix/") - require.NoError(err) - require.Len(entries, 0) -} - func TestGetBlockDataErrors(t *testing.T) { t.Parallel() chainID := "TestGetBlockDataErrors" diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 037507b4e7..259c6cb600 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -174,23 +174,6 @@ func (t *tracedStore) GetMetadata(ctx context.Context, key string) ([]byte, erro return data, nil } -func (t *tracedStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { - ctx, span := t.tracer.Start(ctx, "Store.GetMetadataByPrefix", - trace.WithAttributes(attribute.String("prefix", prefix)), - ) - defer span.End() - - entries, err := t.inner.GetMetadataByPrefix(ctx, prefix) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return entries, err - } - - span.SetAttributes(attribute.Int("entries.count", len(entries))) - return entries, nil -} - func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) error { ctx, span := t.tracer.Start(ctx, "Store.SetMetadata", trace.WithAttributes( diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 020a31e234..3ae8d8902e 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -17,16 +17,16 @@ import ( ) type tracingMockStore struct { - heightFn func(ctx context.Context) (uint64, error) - getBlockDataFn func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) - getBlockByHashFn func(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) - getSignatureFn func(ctx context.Context, height uint64) (*types.Signature, error) - getSignatureByHash func(ctx context.Context, hash []byte) (*types.Signature, error) - getHeaderFn func(ctx context.Context, height uint64) (*types.SignedHeader, error) - getStateFn func(ctx context.Context) (types.State, error) - getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) - getMetadataFn func(ctx context.Context, key string) ([]byte, error) - getMetadataByPrefixFn func(ctx context.Context, prefix string) ([]MetadataEntry, error) + heightFn func(ctx context.Context) (uint64, error) + getBlockDataFn func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) + getBlockByHashFn func(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) + getSignatureFn func(ctx context.Context, height uint64) (*types.Signature, error) + getSignatureByHash func(ctx context.Context, hash []byte) (*types.Signature, error) + getHeaderFn func(ctx context.Context, height uint64) (*types.SignedHeader, error) + getStateFn func(ctx context.Context) (types.State, error) + getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) + getMetadataFn func(ctx context.Context, key string) ([]byte, error) + setMetadataFn func(ctx context.Context, key string, value []byte) error deleteMetadataFn func(ctx context.Context, key string) error rollbackFn func(ctx context.Context, height uint64, aggregator bool) error @@ -98,13 +98,6 @@ func (m *tracingMockStore) GetMetadata(ctx context.Context, key string) ([]byte, return nil, nil } -func (m *tracingMockStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { - if m.getMetadataByPrefixFn != nil { - return m.getMetadataByPrefixFn(ctx, prefix) - } - return nil, nil -} - func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value []byte) error { if m.setMetadataFn != nil { return m.setMetadataFn(ctx, key, value) diff --git a/pkg/store/types.go b/pkg/store/types.go index a461623ba7..90b29a51b7 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -47,22 +47,12 @@ type Store interface { NewBatch(ctx context.Context) (Batch, error) } -// MetadataEntry represents a key-value pair from metadata storage. -type MetadataEntry struct { - Key string - Value []byte -} - type Metadata interface { // SetMetadata saves arbitrary value in the store. // // This method enables evolve to safely persist any information. SetMetadata(ctx context.Context, key string, value []byte) error - // GetMetadataByPrefix returns all metadata entries whose keys have the given prefix. - // This is more efficient than iterating through known keys when the set of keys is unknown. - GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) - // DeleteMetadata removes a metadata key from the store. DeleteMetadata(ctx context.Context, key string) error } diff --git a/test/mocks/store.go b/test/mocks/store.go index 2cde50c543..911832ebab 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -485,74 +485,6 @@ func (_c *MockStore_GetMetadata_Call) RunAndReturn(run func(ctx context.Context, return _c } -// GetMetadataByPrefix provides a mock function for the type MockStore -func (_mock *MockStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]store.MetadataEntry, error) { - ret := _mock.Called(ctx, prefix) - - if len(ret) == 0 { - panic("no return value specified for GetMetadataByPrefix") - } - - var r0 []store.MetadataEntry - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string) ([]store.MetadataEntry, error)); ok { - return returnFunc(ctx, prefix) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, string) []store.MetadataEntry); ok { - r0 = returnFunc(ctx, prefix) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]store.MetadataEntry) - } - } - if returnFunc, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = returnFunc(ctx, prefix) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockStore_GetMetadataByPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMetadataByPrefix' -type MockStore_GetMetadataByPrefix_Call struct { - *mock.Call -} - -// GetMetadataByPrefix is a helper method to define mock.On call -// - ctx context.Context -// - prefix string -func (_e *MockStore_Expecter) GetMetadataByPrefix(ctx interface{}, prefix interface{}) *MockStore_GetMetadataByPrefix_Call { - return &MockStore_GetMetadataByPrefix_Call{Call: _e.mock.On("GetMetadataByPrefix", ctx, prefix)} -} - -func (_c *MockStore_GetMetadataByPrefix_Call) Run(run func(ctx context.Context, prefix string)) *MockStore_GetMetadataByPrefix_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 string - if args[1] != nil { - arg1 = args[1].(string) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *MockStore_GetMetadataByPrefix_Call) Return(metadataEntrys []store.MetadataEntry, err error) *MockStore_GetMetadataByPrefix_Call { - _c.Call.Return(metadataEntrys, err) - return _c -} - -func (_c *MockStore_GetMetadataByPrefix_Call) RunAndReturn(run func(ctx context.Context, prefix string) ([]store.MetadataEntry, error)) *MockStore_GetMetadataByPrefix_Call { - _c.Call.Return(run) - return _c -} - // GetSignature provides a mock function for the type MockStore func (_mock *MockStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { ret := _mock.Called(ctx, height)