diff --git a/cache/lru/cache.go b/cache/lru/cache.go index dd83ee0cca65..056be97d23c8 100644 --- a/cache/lru/cache.go +++ b/cache/lru/cache.go @@ -20,6 +20,17 @@ type Cache[K comparable, V any] struct { lock sync.Mutex elements *linked.Hashmap[K, V] size int + + // onEvict is called with the key and value of an evicted entry, if set. + onEvict func(K, V) +} + +// SetOnEvict sets a callback to be called with the key and value of an evicted entry. +// The callback is called synchronously while holding the cache lock. +func (c *Cache[K, V]) SetOnEvict(cb func(K, V)) { + c.lock.Lock() + defer c.lock.Unlock() + c.onEvict = cb } func NewCache[K comparable, V any](size int) *Cache[K, V] { @@ -34,8 +45,11 @@ func (c *Cache[K, V]) Put(key K, value V) { defer c.lock.Unlock() if c.elements.Len() == c.size { - oldestKey, _, _ := c.elements.Oldest() + oldestKey, oldestValue, _ := c.elements.Oldest() c.elements.Delete(oldestKey) + if c.onEvict != nil { + c.onEvict(oldestKey, oldestValue) + } } c.elements.Put(key, value) } @@ -55,14 +69,25 @@ func (c *Cache[K, V]) Get(key K) (V, bool) { func (c *Cache[K, _]) Evict(key K) { c.lock.Lock() defer c.lock.Unlock() - + value, _ := c.elements.Get(key) c.elements.Delete(key) + if c.onEvict != nil { + c.onEvict(key, value) + } } func (c *Cache[_, _]) Flush() { c.lock.Lock() defer c.lock.Unlock() + // Call onEvict for each element before clearing + if c.onEvict != nil { + iter := c.elements.NewIterator() + for iter.Next() { + c.onEvict(iter.Key(), iter.Value()) + } + } + c.elements.Clear() } diff --git a/cache/lru/cache_test.go b/cache/lru/cache_test.go index 345ce42f1f76..b3378569df17 100644 --- a/cache/lru/cache_test.go +++ b/cache/lru/cache_test.go @@ -6,6 +6,8 @@ package lru import ( "testing" + "github.com/stretchr/testify/require" + "github.com/ava-labs/avalanchego/cache/cachetest" "github.com/ava-labs/avalanchego/ids" ) @@ -19,3 +21,17 @@ func TestCacheEviction(t *testing.T) { c := NewCache[ids.ID, int64](2) cachetest.Eviction(t, c) } + +func TestCacheFlushWithOnEvict(t *testing.T) { + c := NewCache[ids.ID, int64](2) + + // Track which elements were evicted + evicted := make(map[ids.ID]int64) + c.SetOnEvict(func(key ids.ID, value int64) { + evicted[key] = value + }) + + cachetest.Eviction(t, c) + require.Zero(t, c.Len()) + require.Len(t, evicted, 3) +} diff --git a/go.mod b/go.mod index df1a61d3a10a..13ce229eefa0 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60 github.com/ava-labs/libevm v1.13.14-0.3.0.rc.1 github.com/btcsuite/btcd/btcutil v1.1.3 + github.com/cespare/xxhash/v2 v2.3.0 github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 github.com/compose-spec/compose-go v1.20.2 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 @@ -89,7 +90,6 @@ require ( github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/errors v1.9.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/redact v1.1.3 // indirect diff --git a/x/blockdb/README.md b/x/blockdb/README.md new file mode 100644 index 000000000000..eaa1b89badb2 --- /dev/null +++ b/x/blockdb/README.md @@ -0,0 +1,199 @@ +# BlockDB + +BlockDB is a specialized database optimized for blockchain blocks. + +## Key Functionalities + +- **O(1) Performance**: Both reads and writes complete in constant time +- **Parallel Operations**: Multiple threads can read and write blocks concurrently without blocking +- **Flexible Write Ordering**: Supports out-of-order block writes for bootstrapping +- **Configurable Durability**: Optional `syncToDisk` mode guarantees immediate recoverability +- **Automatic Recovery**: Detects and recovers unindexed blocks after unclean shutdowns + +## Design + +BlockDB uses a single index file and multiple data files. The index file maps block heights to locations in the data files, while data files store the actual block content. Data storage can be split across multiple data files based on the maximum data file size. + +``` +┌─────────────────┐ ┌─────────────────┐ +│ Index File │ │ Data File 1 │ +│ (.idx) │ │ (.dat) │ +├─────────────────┤ ├─────────────────┤ +│ Header │ │ Block 0 │ +│ - Version │ ┌─────>│ - Header │ +│ - Min Height │ │ │ - Data │ +│ - Max Height │ │ ├─────────────────┤ +│ - Data Size │ │ │ Block 1 │ +│ - ... │ │ ┌──>│ - Header │ +├─────────────────┤ │ │ │ - Data │ +│ Entry[0] │ │ │ ├─────────────────┤ +│ - Offset ───────┼──┘ │ │ ... │ +│ - Size │ │ └─────────────────┘ +│ - Header Size │ │ +├─────────────────┤ │ +│ Entry[1] │ │ +│ - Offset ───────┼─────┘ +│ - Size │ +│ - Header Size │ +├─────────────────┤ +│ ... │ +└─────────────────┘ +``` + +### File Formats + +#### Index File Structure + +The index file consists of a fixed-size header followed by fixed-size entries: + +``` +Index File Header (64 bytes): +┌────────────────────────────────┬─────────┐ +│ Field │ Size │ +├────────────────────────────────┼─────────┤ +│ Version │ 8 bytes │ +│ Max Data File Size │ 8 bytes │ +│ Min Block Height │ 8 bytes │ +│ Max Contiguous Height │ 8 bytes │ +│ Max Block Height │ 8 bytes │ +│ Next Write Offset │ 8 bytes │ +│ Reserved │ 16 bytes│ +└────────────────────────────────┴─────────┘ + +Index Entry (16 bytes): +┌────────────────────────────────┬─────────┐ +│ Field │ Size │ +├────────────────────────────────┼─────────┤ +│ Data File Offset │ 8 bytes │ +│ Block Data Size │ 4 bytes │ +│ Header Size │ 4 bytes │ +└────────────────────────────────┴─────────┘ +``` + +#### Data File Structure + +Each block in the data file is stored with a block entry header followed by the raw block data: + +``` +Block Entry Header (26 bytes): +┌────────────────────────────────┬─────────┐ +│ Field │ Size │ +├────────────────────────────────┼─────────┤ +│ Height │ 8 bytes │ +│ Size │ 4 bytes │ +│ Checksum │ 8 bytes │ +│ Header Size │ 4 bytes │ +│ Version │ 2 bytes │ +└────────────────────────────────┴─────────┘ +``` + +### Block Overwrites + +BlockDB allows overwriting blocks at existing heights. When a block is overwritten, the new block is appended to the data file and the index entry is updated to point to the new location, leaving the old block data as unreferenced "dead" space. However, since blocks are immutable and rarely overwritten (e.g., during reorgs), this trade-off should have minimal impact in practice. + +### Fixed-Size Index Entries + +Each index entry is exactly 16 bytes on disk, containing the offset, size, and header size. This fixed size enables direct calculation of where each block's index entry is located, providing O(1) lookups. For blockchains with high block heights, the index remains efficient, even at height 1 billion, the index file would only be ~16GB. + +### Durability and Fsync Behavior + +BlockDB provides configurable durability through the `syncToDisk` parameter: + +**Data File Behavior:** + +- **When `syncToDisk=true`**: The data file is fsync'd after every block write, guaranteeing durability against both process failures and kernel/machine failures. +- **When `syncToDisk=false`**: Data file writes are buffered, providing durability against process failures but not against kernel or machine failures. + +**Index File Behavior:** + +- **When `syncToDisk=true`**: The index file is fsync'd every `CheckpointInterval` blocks (when the header is written). +- **When `syncToDisk=false`**: The index file relies on OS buffering and is not explicitly fsync'd. + +### Recovery Mechanism + +On startup, BlockDB checks for signs of an unclean shutdown by comparing the data file size on disk with the indexed data size stored in the index file header. If the data files are larger than what the index claims, it indicates that blocks were written but the index wasn't properly updated before shutdown. + +**Recovery Process:** + +1. Starts scanning from where the index left off (`NextWriteOffset`) +2. For each unindexed block found: + - Validates the block entry header and checksum + - Writes the corresponding index entry +3. Calculates the max contiguous height and max block height +4. Updates the index header with the updated max contiguous height, max block height, and next write offset + +## Usage + +### Creating a Database + +```go +import ( + "errors" + "github.com/ava-labs/avalanchego/x/blockdb" +) + +config := blockdb.DefaultConfig(). + WithDir("/path/to/blockdb") +db, err := blockdb.New(config, logging.NoLog{}) +if err != nil { + fmt.Println("Error creating database:", err) + return +} +defer db.Close() +``` + +### Writing and Reading Blocks + +```go +// Write a block with header size +height := uint64(100) +blockData := []byte("header:block data") +headerSize := uint32(7) // First 7 bytes are the header +err := db.WriteBlock(height, blockData, headerSize) +if err != nil { + fmt.Println("Error writing block:", err) + return +} + +// Read a block +blockData, err := db.ReadBlock(height) +if err != nil { + if errors.Is(err, blockdb.ErrBlockNotFound) { + fmt.Println("Block doesn't exist at this height") + return + } + fmt.Println("Error reading block:", err) + return +} + +// Read block components separately +headerData, err := db.ReadHeader(height) +if err != nil { + if errors.Is(err, blockdb.ErrBlockNotFound) { + fmt.Println("Block doesn't exist at this height") + return + } + fmt.Println("Error reading header:", err) + return +} +bodyData, err := db.ReadBody(height) +if err != nil { + if errors.Is(err, blockdb.ErrBlockNotFound) { + fmt.Println("Block doesn't exist at this height") + return + } + fmt.Println("Error reading body:", err) + return +} +``` + +## TODO + +- Compress data files to reduce storage size +- ~~Split data across multiple files when `MaxDataFileSize` is reached~~ +- Implement a block cache for recently accessed blocks +- Use a buffered pool to avoid allocations on reads and writes +- ~~Add tests for core functionality~~ +- Add metrics +- Add performance benchmarks +- Consider supporting missing data files (currently we error if any data files are missing) diff --git a/x/blockdb/config.go b/x/blockdb/config.go new file mode 100644 index 000000000000..18ac3bd2f6fa --- /dev/null +++ b/x/blockdb/config.go @@ -0,0 +1,128 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import "errors" + +// DefaultMaxDataFileSize is the default maximum size of the data block file in bytes (500GB). +const DefaultMaxDataFileSize = 500 * 1024 * 1024 * 1024 + +// DefaultMaxDataFiles is the default maximum number of data files descriptors cached. +const DefaultMaxDataFiles = 10 + +// DatabaseConfig contains configuration parameters for BlockDB. +type DatabaseConfig struct { + // IndexDir is the directory where the index file is stored. + IndexDir string + + // DataDir is the directory where the data files are stored. + DataDir string + + // MinimumHeight is the lowest block height tracked by the database. + MinimumHeight uint64 + + // MaxDataFileSize sets the maximum size of the data block file in bytes. + MaxDataFileSize uint64 + + // MaxDataFiles is the maximum number of data files descriptors cached. + MaxDataFiles int + + // CheckpointInterval defines how frequently (in blocks) the index file header is updated (default: 1024). + CheckpointInterval uint64 + + // SyncToDisk determines if fsync is called after each write for durability. + SyncToDisk bool + + // Truncate determines if existing data should be truncated when opening the database. + Truncate bool +} + +// DefaultConfig returns the default options for BlockDB. +func DefaultConfig() DatabaseConfig { + return DatabaseConfig{ + IndexDir: "", + DataDir: "", + MinimumHeight: 0, + MaxDataFileSize: DefaultMaxDataFileSize, + MaxDataFiles: DefaultMaxDataFiles, + CheckpointInterval: 1024, + SyncToDisk: true, + Truncate: false, + } +} + +// WithDir sets both IndexDir and DataDir to the given value. +func (c DatabaseConfig) WithDir(directory string) DatabaseConfig { + c.IndexDir = directory + c.DataDir = directory + return c +} + +// WithIndexDir returns a copy of the config with IndexDir set to the given value. +func (c DatabaseConfig) WithIndexDir(indexDir string) DatabaseConfig { + c.IndexDir = indexDir + return c +} + +// WithDataDir returns a copy of the config with DataDir set to the given value. +func (c DatabaseConfig) WithDataDir(dataDir string) DatabaseConfig { + c.DataDir = dataDir + return c +} + +// WithSyncToDisk returns a copy of the config with SyncToDisk set to the given value. +func (c DatabaseConfig) WithSyncToDisk(syncToDisk bool) DatabaseConfig { + c.SyncToDisk = syncToDisk + return c +} + +// WithTruncate returns a copy of the config with Truncate set to the given value. +func (c DatabaseConfig) WithTruncate(truncate bool) DatabaseConfig { + c.Truncate = truncate + return c +} + +// WithMinimumHeight returns a copy of the config with MinimumHeight set to the given value. +func (c DatabaseConfig) WithMinimumHeight(minHeight uint64) DatabaseConfig { + c.MinimumHeight = minHeight + return c +} + +// WithMaxDataFileSize returns a copy of the config with MaxDataFileSize set to the given value. +func (c DatabaseConfig) WithMaxDataFileSize(maxSize uint64) DatabaseConfig { + c.MaxDataFileSize = maxSize + return c +} + +// WithMaxDataFiles returns a copy of the config with MaxDataFiles set to the given value. +func (c DatabaseConfig) WithMaxDataFiles(maxFiles int) DatabaseConfig { + c.MaxDataFiles = maxFiles + return c +} + +// WithCheckpointInterval returns a copy of the config with CheckpointInterval set to the given value. +func (c DatabaseConfig) WithCheckpointInterval(interval uint64) DatabaseConfig { + c.CheckpointInterval = interval + return c +} + +// Validate checks if the store options are valid. +func (c DatabaseConfig) Validate() error { + if c.IndexDir == "" { + return errors.New("IndexDir must be provided") + } + if c.DataDir == "" { + return errors.New("DataDir must be provided") + } + if c.CheckpointInterval == 0 { + return errors.New("CheckpointInterval cannot be 0") + } + if c.MaxDataFiles <= 0 { + return errors.New("MaxDataFiles must be positive") + } + if c.MaxDataFileSize == 0 { + return errors.New("MaxDataFileSize must be positive") + } + return nil +} diff --git a/x/blockdb/database.go b/x/blockdb/database.go new file mode 100644 index 000000000000..af9f897b5d50 --- /dev/null +++ b/x/blockdb/database.go @@ -0,0 +1,1337 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "encoding" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "os" + "path/filepath" + "sync" + "sync/atomic" + + "github.com/cespare/xxhash/v2" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/cache/lru" + "github.com/ava-labs/avalanchego/utils/logging" + + safemath "github.com/ava-labs/avalanchego/utils/math" +) + +const ( + indexFileName = "blockdb.idx" + dataFileNameFormat = "blockdb_%d.dat" + defaultFilePermissions = 0o666 + + // Since 0 is a valid height, math.MaxUint64 is used to indicate unset height. + // It is not possible for block height to be max uint64 as it would overflow the index entry offset + unsetHeight = math.MaxUint64 + + // IndexFileVersion is the version of the index file format. + IndexFileVersion uint64 = 1 + + // BlockEntryVersion is the version of the block entry. + BlockEntryVersion uint16 = 1 +) + +// BlockHeight defines the type for block heights. +type BlockHeight = uint64 + +// BlockData defines the type for block data. +type BlockData = []byte + +// BlockHeaderSize is the size of the header in the block data. +type BlockHeaderSize = uint32 + +var ( + _ encoding.BinaryMarshaler = (*blockEntryHeader)(nil) + _ encoding.BinaryUnmarshaler = (*blockEntryHeader)(nil) + _ encoding.BinaryMarshaler = (*indexEntry)(nil) + _ encoding.BinaryUnmarshaler = (*indexEntry)(nil) + _ encoding.BinaryMarshaler = (*indexFileHeader)(nil) + _ encoding.BinaryUnmarshaler = (*indexFileHeader)(nil) + + sizeOfBlockEntryHeader = uint32(binary.Size(blockEntryHeader{})) + sizeOfIndexEntry = uint64(binary.Size(indexEntry{})) + sizeOfIndexFileHeader = uint64(binary.Size(indexFileHeader{})) +) + +// blockEntryHeader is the header of a block entry in the data file. +// This is not the header portion of the block data itself. +type blockEntryHeader struct { + Height BlockHeight + Size uint32 + Checksum uint64 + HeaderSize BlockHeaderSize + Version uint16 +} + +// MarshalBinary implements the encoding.BinaryMarshaler interface. +func (beh blockEntryHeader) MarshalBinary() ([]byte, error) { + buf := make([]byte, sizeOfBlockEntryHeader) + binary.LittleEndian.PutUint64(buf[0:], beh.Height) + binary.LittleEndian.PutUint32(buf[8:], beh.Size) + binary.LittleEndian.PutUint64(buf[12:], beh.Checksum) + binary.LittleEndian.PutUint32(buf[20:], beh.HeaderSize) + binary.LittleEndian.PutUint16(buf[24:], beh.Version) + return buf, nil +} + +// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface. +func (beh *blockEntryHeader) UnmarshalBinary(data []byte) error { + if len(data) != int(sizeOfBlockEntryHeader) { + return fmt.Errorf("%w: incorrect data length to unmarshal blockEntryHeader: got %d bytes, need exactly %d", ErrCorrupted, len(data), sizeOfBlockEntryHeader) + } + beh.Height = binary.LittleEndian.Uint64(data[0:]) + beh.Size = binary.LittleEndian.Uint32(data[8:]) + beh.Checksum = binary.LittleEndian.Uint64(data[12:]) + beh.HeaderSize = binary.LittleEndian.Uint32(data[20:]) + beh.Version = binary.LittleEndian.Uint16(data[24:]) + return nil +} + +// indexEntry represents an entry in the index file. +type indexEntry struct { + // Offset is the byte offset in the data file where the block's header starts. + Offset uint64 + // Size is the length in bytes of the block's data (excluding the blockHeader). + Size uint32 + // HeaderSize is the size in bytes of the block's header portion within the data. + HeaderSize BlockHeaderSize +} + +// IsEmpty returns true if this entry is uninitialized. +// This indicates a slot where no block has been written. +func (e indexEntry) IsEmpty() bool { + return e.Offset == 0 && e.Size == 0 +} + +// MarshalBinary implements encoding.BinaryMarshaler for indexEntry. +func (e indexEntry) MarshalBinary() ([]byte, error) { + buf := make([]byte, sizeOfIndexEntry) + binary.LittleEndian.PutUint64(buf[0:], e.Offset) + binary.LittleEndian.PutUint32(buf[8:], e.Size) + binary.LittleEndian.PutUint32(buf[12:], e.HeaderSize) + return buf, nil +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler for indexEntry. +func (e *indexEntry) UnmarshalBinary(data []byte) error { + if len(data) != int(sizeOfIndexEntry) { + return fmt.Errorf("%w: incorrect data length to unmarshal indexEntry: got %d bytes, need exactly %d", ErrCorrupted, len(data), sizeOfIndexEntry) + } + e.Offset = binary.LittleEndian.Uint64(data[0:]) + e.Size = binary.LittleEndian.Uint32(data[8:]) + e.HeaderSize = binary.LittleEndian.Uint32(data[12:]) + return nil +} + +// indexFileHeader is the header of the index file. +type indexFileHeader struct { + Version uint64 + MaxDataFileSize uint64 + MinHeight BlockHeight + MaxContiguousHeight BlockHeight + MaxHeight BlockHeight + NextWriteOffset uint64 + // reserve 16 bytes for future use + Reserved [16]byte +} + +// MarshalBinary implements encoding.BinaryMarshaler for indexFileHeader. +func (h indexFileHeader) MarshalBinary() ([]byte, error) { + buf := make([]byte, sizeOfIndexFileHeader) + binary.LittleEndian.PutUint64(buf[0:], h.Version) + binary.LittleEndian.PutUint64(buf[8:], h.MaxDataFileSize) + binary.LittleEndian.PutUint64(buf[16:], h.MinHeight) + binary.LittleEndian.PutUint64(buf[24:], h.MaxContiguousHeight) + binary.LittleEndian.PutUint64(buf[32:], h.MaxHeight) + binary.LittleEndian.PutUint64(buf[40:], h.NextWriteOffset) + return buf, nil +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler for indexFileHeader. +func (h *indexFileHeader) UnmarshalBinary(data []byte) error { + if len(data) != int(sizeOfIndexFileHeader) { + return fmt.Errorf( + "%w: incorrect data length to unmarshal indexFileHeader: got %d bytes, need exactly %d", + ErrCorrupted, len(data), sizeOfIndexFileHeader, + ) + } + h.Version = binary.LittleEndian.Uint64(data[0:]) + h.MaxDataFileSize = binary.LittleEndian.Uint64(data[8:]) + h.MinHeight = binary.LittleEndian.Uint64(data[16:]) + h.MaxContiguousHeight = binary.LittleEndian.Uint64(data[24:]) + h.MaxHeight = binary.LittleEndian.Uint64(data[32:]) + h.NextWriteOffset = binary.LittleEndian.Uint64(data[40:]) + return nil +} + +type blockHeights struct { + // maxBlockHeight tracks the highest block height that has been written to the db, even if there are gaps in the sequence. + maxBlockHeight BlockHeight + // maxContiguousHeight tracks the highest block height known to be contiguously stored. + maxContiguousHeight BlockHeight +} + +// Database stores blockchain blocks on disk and provides methods to read, and write blocks. +type Database struct { + indexFile *os.File + config DatabaseConfig + header indexFileHeader + log logging.Logger + closed bool + fileCache *lru.Cache[int, *os.File] + + // closeMu prevents the database from being closed while in use and prevents + // use of a closed database. + closeMu sync.RWMutex + + // fileOpenMu prevents race conditions when multiple threads try to open the same data file + fileOpenMu sync.Mutex + + // blockHeights holds the max block height and max contiguous height + blockHeights atomic.Pointer[blockHeights] + // nextDataWriteOffset tracks the next position to write new data in the data file. + nextDataWriteOffset atomic.Uint64 +} + +// New creates a block database. +// Parameters: +// - config: Configuration parameters +// - log: Logger instance for structured logging +func New(config DatabaseConfig, log logging.Logger) (*Database, error) { + if err := config.Validate(); err != nil { + return nil, err + } + + databaseLog := log + if databaseLog == nil { + databaseLog = logging.NoLog{} + } + + s := &Database{ + config: config, + log: databaseLog, + fileCache: lru.NewCache[int, *os.File](config.MaxDataFiles), + } + s.fileCache.SetOnEvict(func(_ int, f *os.File) { + if f != nil { + f.Close() + } + }) + + s.log.Info("Initializing BlockDB", + zap.String("indexDir", config.IndexDir), + zap.String("dataDir", config.DataDir), + zap.Uint64("maxDataFileSize", config.MaxDataFileSize), + zap.Int("maxDataFiles", config.MaxDataFiles), + zap.Bool("truncate", config.Truncate), + ) + + if err := s.openAndInitializeIndex(); err != nil { + s.log.Error("Failed to initialize database: failed to initialize index", zap.Error(err)) + return nil, err + } + + if err := s.initializeDataFiles(); err != nil { + s.log.Error("Failed to initialize database: failed to initialize data files", zap.Error(err)) + s.closeFiles() + return nil, err + } + + if !config.Truncate { + if err := s.recover(); err != nil { + s.log.Error("Failed to initialize database: recovery failed", zap.Error(err)) + s.closeFiles() + return nil, fmt.Errorf("recovery failed: %w", err) + } + } + + heights := s.getBlockHeights() + s.log.Info("BlockDB initialized successfully", + zap.Uint64("nextWriteOffset", s.nextDataWriteOffset.Load()), + zap.Uint64("maxContiguousHeight", heights.maxContiguousHeight), + zap.Uint64("maxBlockHeight", heights.maxBlockHeight), + ) + + return s, nil +} + +// MaxContiguousHeight returns the highest block height known to be contiguously stored. +func (s *Database) MaxContiguousHeight() (height BlockHeight, found bool) { + heights := s.getBlockHeights() + if heights.maxContiguousHeight == unsetHeight { + return 0, false + } + return heights.maxContiguousHeight, true +} + +func (s *Database) setBlockHeights(maxBlock, maxContiguous BlockHeight) { + heights := &blockHeights{ + maxBlockHeight: maxBlock, + maxContiguousHeight: maxContiguous, + } + s.blockHeights.Store(heights) +} + +func (s *Database) updateBlockHeightsAtomically(updateFn func(*blockHeights) *blockHeights) { + for { + current := s.getBlockHeights() + updated := updateFn(current) + if s.blockHeights.CompareAndSwap(current, updated) { + break + } + } +} + +// Close flushes pending writes and closes the store files. +func (s *Database) Close() error { + s.closeMu.Lock() + defer s.closeMu.Unlock() + + if s.closed { + return nil + } + s.closed = true + + err := s.persistIndexHeader() + if err != nil { + s.log.Error("Failed to close database: failed to persist index header", zap.Error(err)) + } + + s.closeFiles() + + s.log.Info("Block database closed successfully") + return err +} + +// WriteBlock inserts a block into the store at the given height with the specified header size. +func (s *Database) WriteBlock(height BlockHeight, block BlockData, headerSize BlockHeaderSize) error { + s.closeMu.RLock() + defer s.closeMu.RUnlock() + + if s.closed { + s.log.Error("Failed to write block: database is closed", + zap.Uint64("height", height), + ) + return ErrDatabaseClosed + } + + blockSize := len(block) + if blockSize > math.MaxUint32 { + s.log.Error("Failed to write block: block size exceeds max size for uint32", + zap.Uint64("height", height), + zap.Int("blockSize", blockSize), + ) + return fmt.Errorf("%w: block size cannot exceed %d bytes", ErrBlockTooLarge, math.MaxUint32) + } + + blockDataLen := uint32(blockSize) + if blockDataLen == 0 { + s.log.Error("Failed to write block: empty block", zap.Uint64("height", height)) + return ErrBlockEmpty + } + + if headerSize >= blockDataLen { + s.log.Error("Failed to write block: header size exceeds block size", + zap.Uint64("height", height), + zap.Uint32("headerSize", headerSize), + zap.Uint32("blockSize", blockDataLen), + ) + return ErrHeaderSizeTooLarge + } + + indexFileOffset, err := s.indexEntryOffset(height) + if err != nil { + s.log.Error("Failed to write block: failed to calculate index entry offset", + zap.Uint64("height", height), + zap.Error(err), + ) + return fmt.Errorf("failed to get index entry offset for block at height %d: %w", height, err) + } + + sizeWithDataHeader, err := safemath.Add(sizeOfBlockEntryHeader, blockDataLen) + if err != nil { + s.log.Error("Failed to write block: block size calculation overflow", + zap.Uint64("height", height), + zap.Uint32("blockSize", blockDataLen), + zap.Error(err), + ) + return fmt.Errorf("calculating total block size would overflow for block at height %d: %w", height, err) + } + writeDataOffset, err := s.allocateBlockSpace(sizeWithDataHeader) + if err != nil { + s.log.Error("Failed to write block: failed to allocate block space", + zap.Uint64("height", height), + zap.Uint32("totalSize", sizeWithDataHeader), + zap.Error(err), + ) + return err + } + + bh := blockEntryHeader{ + Height: height, + Size: blockDataLen, + HeaderSize: headerSize, + Checksum: calculateChecksum(block), + Version: BlockEntryVersion, + } + if err := s.writeBlockAt(writeDataOffset, bh, block); err != nil { + s.log.Error("Failed to write block: error writing block data", + zap.Uint64("height", height), + zap.Uint64("dataOffset", writeDataOffset), + zap.Error(err), + ) + return err + } + + if err := s.writeIndexEntryAt(indexFileOffset, writeDataOffset, blockDataLen, headerSize); err != nil { + s.log.Error("Failed to write block: error writing index entry", + zap.Uint64("height", height), + zap.Uint64("indexOffset", indexFileOffset), + zap.Uint64("dataOffset", writeDataOffset), + zap.Error(err), + ) + return err + } + + if err := s.updateBlockHeights(height); err != nil { + s.log.Error("Failed to write block: error updating block heights", + zap.Uint64("height", height), + zap.Error(err), + ) + return err + } + + s.log.Debug("Block written successfully", + zap.Uint64("height", height), + zap.Uint32("blockSize", blockDataLen), + zap.Uint32("headerSize", headerSize), + zap.Uint64("dataOffset", writeDataOffset), + ) + + return nil +} + +// readBlockIndex reads the index entry for the given height. +// It returns ErrBlockNotFound if the block does not exist. +func (s *Database) readBlockIndex(height BlockHeight) (indexEntry, error) { + var entry indexEntry + if s.closed { + s.log.Error("Failed to read block index: database is closed", + zap.Uint64("height", height), + ) + return entry, ErrDatabaseClosed + } + + // Skip the index entry read if we know the block is past the max height. + heights := s.getBlockHeights() + if heights.maxBlockHeight == unsetHeight { + s.log.Debug("Block not found", + zap.Uint64("height", height), + zap.String("reason", "no blocks written yet"), + ) + return entry, fmt.Errorf("%w: no blocks written yet", ErrBlockNotFound) + } + if height > heights.maxBlockHeight { + s.log.Debug("Block not found", + zap.Uint64("height", height), + zap.Uint64("maxHeight", heights.maxBlockHeight), + zap.String("reason", "height beyond max"), + ) + return entry, fmt.Errorf("%w: height %d is beyond max height %d", ErrBlockNotFound, height, heights.maxBlockHeight) + } + + entry, err := s.readIndexEntry(height) + if err != nil { + if errors.Is(err, ErrBlockNotFound) { + s.log.Debug("Block not found", + zap.Uint64("height", height), + zap.String("reason", "no index entry found"), + zap.Error(err), + ) + } else { + s.log.Error("Failed to read block index: failed to read index entry", + zap.Uint64("height", height), + zap.Error(err), + ) + } + return entry, err + } + + return entry, nil +} + +// ReadBlock retrieves a block by its height. +// Returns ErrBlockNotFound if the block is not found. +func (s *Database) ReadBlock(height BlockHeight) (BlockData, error) { + s.closeMu.RLock() + defer s.closeMu.RUnlock() + + indexEntry, err := s.readBlockIndex(height) + if err != nil { + return nil, err + } + + // Read the complete block data + blockData := make(BlockData, indexEntry.Size) + dataFile, localOffset, err := s.getDataFileAndOffset(indexEntry.Offset) + if err != nil { + s.log.Error("Failed to read block: failed to get data file", + zap.Uint64("height", height), + zap.Uint64("dataOffset", indexEntry.Offset), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to get data file for block at height %d: %w", height, err) + } + _, err = dataFile.ReadAt(blockData, int64(localOffset+uint64(sizeOfBlockEntryHeader))) + if err != nil { + s.log.Error("Failed to read block: failed to read block data from file", + zap.Uint64("height", height), + zap.Uint64("localOffset", localOffset), + zap.Uint32("blockSize", indexEntry.Size), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to read block data from data file: %w", err) + } + + return blockData, nil +} + +// ReadHeader retrieves only the header portion of a block by its height. +// Returns ErrBlockNotFound if the block is not found, or nil if no header. +func (s *Database) ReadHeader(height BlockHeight) (BlockData, error) { + s.closeMu.RLock() + defer s.closeMu.RUnlock() + + indexEntry, err := s.readBlockIndex(height) + if err != nil { + return nil, err + } + + // Return nil if there's no header data + if indexEntry.HeaderSize == 0 { + return nil, nil + } + + // Validate header size doesn't exceed total block size + if indexEntry.HeaderSize > indexEntry.Size { + s.log.Error("Failed to read header: header size exceeds block size", + zap.Uint64("height", height), + zap.Uint32("headerSize", indexEntry.HeaderSize), + zap.Uint32("blockSize", indexEntry.Size), + ) + return nil, fmt.Errorf("invalid header size %d exceeds block size %d", indexEntry.HeaderSize, indexEntry.Size) + } + + // Read only the header portion + headerData := make([]byte, indexEntry.HeaderSize) + dataFile, localOffset, err := s.getDataFileAndOffset(indexEntry.Offset) + if err != nil { + s.log.Error("Failed to read header: failed to get data file", + zap.Uint64("height", height), + zap.Uint64("dataOffset", indexEntry.Offset), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to get data file for block header at height %d: %w", height, err) + } + _, err = dataFile.ReadAt(headerData, int64(localOffset+uint64(sizeOfBlockEntryHeader))) + if err != nil { + s.log.Error("Failed to read header: failed to read header data from file", + zap.Uint64("height", height), + zap.Uint64("localOffset", localOffset), + zap.Uint32("headerSize", indexEntry.HeaderSize), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to read block header data from data file: %w", err) + } + + return headerData, nil +} + +// ReadBody retrieves only the body portion (excluding header) of a block by its height. +// Returns ErrBlockNotFound if the block is not found. +func (s *Database) ReadBody(height BlockHeight) (BlockData, error) { + s.closeMu.RLock() + defer s.closeMu.RUnlock() + + indexEntry, err := s.readBlockIndex(height) + if err != nil { + return nil, err + } + + bodySize := indexEntry.Size - indexEntry.HeaderSize + bodyData := make([]byte, bodySize) + dataFile, localOffset, err := s.getDataFileAndOffset(indexEntry.Offset) + if err != nil { + s.log.Error("Failed to read body: failed to get data file", + zap.Uint64("height", height), + zap.Uint64("dataOffset", indexEntry.Offset), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to get data file for block body at height %d: %w", height, err) + } + headerOffset, err := safemath.Add(localOffset, uint64(sizeOfBlockEntryHeader)) + if err != nil { + s.log.Error("Failed to read body: header offset calculation overflow", + zap.Uint64("height", height), + zap.Uint64("localOffset", localOffset), + zap.Error(err), + ) + return nil, fmt.Errorf("calculating header offset would overflow for block at height %d: %w", height, err) + } + bodyOffset, err := safemath.Add(headerOffset, uint64(indexEntry.HeaderSize)) + if err != nil { + s.log.Error("Failed to read body: body offset calculation overflow", + zap.Uint64("height", height), + zap.Uint64("headerOffset", headerOffset), + zap.Uint32("headerSize", indexEntry.HeaderSize), + zap.Error(err), + ) + return nil, fmt.Errorf("calculating body offset would overflow for block at height %d: %w", height, err) + } + + _, err = dataFile.ReadAt(bodyData, int64(bodyOffset)) + if err != nil { + s.log.Error("Failed to read body: failed to read body data from file", + zap.Uint64("height", height), + zap.Uint64("bodyOffset", bodyOffset), + zap.Uint32("bodySize", bodySize), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to read block body data from data file: %w", err) + } + return bodyData, nil +} + +// HasBlock checks if a block exists at the given height. +func (s *Database) HasBlock(height BlockHeight) (bool, error) { + s.closeMu.RLock() + defer s.closeMu.RUnlock() + + _, err := s.readBlockIndex(height) + if err != nil { + if errors.Is(err, ErrBlockNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *Database) indexEntryOffset(height BlockHeight) (uint64, error) { + if height < s.header.MinHeight { + return 0, fmt.Errorf("%w: failed to get index entry offset for block at height %d, minimum height is %d", ErrInvalidBlockHeight, height, s.header.MinHeight) + } + + relativeHeight := height - s.header.MinHeight + offsetFromHeaderStart, err := safemath.Mul(relativeHeight, sizeOfIndexEntry) + if err != nil { + return 0, fmt.Errorf("%w: block height %d is too large", ErrInvalidBlockHeight, height) + } + finalOffset, err := safemath.Add(sizeOfIndexFileHeader, offsetFromHeaderStart) + if err != nil { + return 0, fmt.Errorf("%w: block height %d is too large", ErrInvalidBlockHeight, height) + } + + return finalOffset, nil +} + +// readIndexEntry reads the index entry for the given height from the index file. +// Returns ErrBlockNotFound if the block does not exist. +func (s *Database) readIndexEntry(height BlockHeight) (indexEntry, error) { + var entry indexEntry + + offset, err := s.indexEntryOffset(height) + if err != nil { + return entry, err + } + + buf := make([]byte, sizeOfIndexEntry) + _, err = s.indexFile.ReadAt(buf, int64(offset)) + if err != nil { + // Return ErrBlockNotFound if trying to read past the end of the index file + // for a block that has not been indexed yet. + if errors.Is(err, io.EOF) { + return entry, fmt.Errorf("%w: EOF reading index entry at offset %d for height %d", ErrBlockNotFound, offset, height) + } + return entry, fmt.Errorf("failed to read index entry at offset %d for height %d: %w", offset, height, err) + } + if err := entry.UnmarshalBinary(buf); err != nil { + return entry, fmt.Errorf("failed to deserialize index entry for height %d: %w", height, err) + } + + if entry.IsEmpty() { + return entry, fmt.Errorf("%w: empty index entry for height %d", ErrBlockNotFound, height) + } + + return entry, nil +} + +func (s *Database) writeIndexEntryAt(indexFileOffset, dataFileBlockOffset uint64, blockDataLen uint32, headerSize BlockHeaderSize) error { + indexEntry := indexEntry{ + Offset: dataFileBlockOffset, + Size: blockDataLen, + HeaderSize: headerSize, + } + + entryBytes, err := indexEntry.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to serialize index entry: %w", err) + } + + if _, err := s.indexFile.WriteAt(entryBytes, int64(indexFileOffset)); err != nil { + return fmt.Errorf("failed to write index entry: %w", err) + } + return nil +} + +func (s *Database) persistIndexHeader() error { + // The index file must be fsync'd before the header is written to prevent + // a state where the header is persisted but the index entries it refers to + // are not. This could lead to data inconsistency on recovery. + if s.config.SyncToDisk { + if err := s.indexFile.Sync(); err != nil { + return fmt.Errorf("failed to sync index file before writing header state: %w", err) + } + } + + header := s.header + + // Update the header with the current state of the database. + header.NextWriteOffset = s.nextDataWriteOffset.Load() + heights := s.getBlockHeights() + header.MaxContiguousHeight = heights.maxContiguousHeight + header.MaxHeight = heights.maxBlockHeight + headerBytes, err := header.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to serialize header for writing state: %w", err) + } + if uint64(len(headerBytes)) != sizeOfIndexFileHeader { + return fmt.Errorf("internal error: serialized header state size %d, expected %d", len(headerBytes), sizeOfIndexFileHeader) + } + + if _, err := s.indexFile.WriteAt(headerBytes, 0); err != nil { + return fmt.Errorf("failed to write header state to index file: %w", err) + } + return nil +} + +func (s *Database) getBlockHeights() *blockHeights { + heights := s.blockHeights.Load() + if heights == nil { + return &blockHeights{ + maxBlockHeight: unsetHeight, + maxContiguousHeight: unsetHeight, + } + } + return heights +} + +// recover detects and recovers unindexed blocks by scanning data files and updating the index. +// It compares the actual data file sizes on disk with the indexed data size to detect +// blocks that were written but not properly indexed. +// For each unindexed block found, it validates the block, then +// writes the corresponding index entry and updates block height tracking. +func (s *Database) recover() error { + dataFiles, maxIndex, err := s.listDataFiles() + if err != nil { + return fmt.Errorf("failed to list data files for recovery: %w", err) + } + + if len(dataFiles) == 0 { + return nil + } + + if s.header.MaxDataFileSize == math.MaxUint64 && len(dataFiles) > 1 { + return fmt.Errorf("%w: only one data file expected when MaxDataFileSize is max uint64, got %d files with max index %d", ErrCorrupted, len(dataFiles), maxIndex) + } + + // ensure no data files are missing + // If any data files are missing, we would need to recalculate the max height + // and max contiguous height. This can be supported in the future but for now + // to keep things simple, we will just error if the data files are not as expected. + for i := 0; i <= maxIndex; i++ { + if _, exists := dataFiles[i]; !exists { + return fmt.Errorf("%w: data file at index %d is missing", ErrCorrupted, i) + } + } + + // Calculate the expected next write offset based on the data on disk. + var calculatedNextDataWriteOffset uint64 + fileSizeContribution, err := safemath.Mul(uint64(maxIndex), s.header.MaxDataFileSize) + if err != nil { + return fmt.Errorf("calculating file size contribution would overflow: %w", err) + } + calculatedNextDataWriteOffset = fileSizeContribution + + lastFileInfo, err := os.Stat(dataFiles[maxIndex]) + if err != nil { + return fmt.Errorf("failed to get stats for last data file %s: %w", dataFiles[maxIndex], err) + } + calculatedNextDataWriteOffset, err = safemath.Add(calculatedNextDataWriteOffset, uint64(lastFileInfo.Size())) + if err != nil { + return fmt.Errorf("adding last file size would overflow: %w", err) + } + + nextDataWriteOffset := s.nextDataWriteOffset.Load() + switch { + case calculatedNextDataWriteOffset == nextDataWriteOffset: + s.log.Debug("Recovery: data files match index header, no recovery needed.") + return nil + + case calculatedNextDataWriteOffset < nextDataWriteOffset: + // this happens when the index claims to have more data than is actually on disk + return fmt.Errorf("%w: index header claims to have more data than is actually on disk "+ + "(calculated: %d bytes, index header: %d bytes)", + ErrCorrupted, calculatedNextDataWriteOffset, nextDataWriteOffset) + default: + // The data on disk is ahead of the index. We need to recover un-indexed blocks. + s.log.Info("Recovery: data files are ahead of index; recovering un-indexed blocks.", + zap.Uint64("headerNextWriteOffset", nextDataWriteOffset), + zap.Uint64("actualDataNextWriteOffset", calculatedNextDataWriteOffset), + ) + + // Start scan from where the index left off. + currentScanOffset := nextDataWriteOffset + recoveredHeights := make([]BlockHeight, 0) + for currentScanOffset < calculatedNextDataWriteOffset { + bh, err := s.recoverBlockAtOffset(currentScanOffset, calculatedNextDataWriteOffset) + if err != nil { + if errors.Is(err, io.EOF) { + // reach end of this file, try to read the next file + currentFileIndex := int(currentScanOffset / s.header.MaxDataFileSize) + nextFileIndex, err := safemath.Add(uint64(currentFileIndex), 1) + if err != nil { + return fmt.Errorf("recovery: overflow in file index calculation: %w", err) + } + if currentScanOffset, err = safemath.Mul(nextFileIndex, s.header.MaxDataFileSize); err != nil { + return fmt.Errorf("recovery: overflow in scan offset calculation: %w", err) + } + continue + } + return err + } + s.log.Debug("Recovery: Successfully validated and indexed block", + zap.Uint64("height", bh.Height), + zap.Uint32("blockSize", bh.Size), + zap.Uint64("dataOffset", currentScanOffset), + ) + recoveredHeights = append(recoveredHeights, bh.Height) + blockTotalSize, err := safemath.Add(uint64(sizeOfBlockEntryHeader), uint64(bh.Size)) + if err != nil { + return fmt.Errorf("recovery: overflow in block size calculation: %w", err) + } + currentScanOffset, err = safemath.Add(currentScanOffset, blockTotalSize) + if err != nil { + return fmt.Errorf("recovery: overflow in scan offset calculation: %w", err) + } + } + s.nextDataWriteOffset.Store(currentScanOffset) + + // Update block heights based on recovered blocks + if len(recoveredHeights) > 0 { + if err := s.updateRecoveredBlockHeights(recoveredHeights); err != nil { + return fmt.Errorf("recovery: failed to update block heights: %w", err) + } + } + + if err := s.persistIndexHeader(); err != nil { + return fmt.Errorf("recovery: failed to save index header after recovery scan: %w", err) + } + + heights := s.getBlockHeights() + s.log.Info("Recovery: Scan finished", + zap.Int("recoveredBlocks", len(recoveredHeights)), + zap.Uint64("finalNextWriteOffset", s.nextDataWriteOffset.Load()), + zap.Uint64("maxContiguousBlockHeight", heights.maxContiguousHeight), + zap.Uint64("maxBlockHeight", heights.maxBlockHeight), + ) + } + return nil +} + +func (s *Database) recoverBlockAtOffset(offset, totalDataSize uint64) (blockEntryHeader, error) { + var bh blockEntryHeader + if totalDataSize-offset < uint64(sizeOfBlockEntryHeader) { + return bh, fmt.Errorf("%w: not enough data for block header at offset %d", ErrCorrupted, offset) + } + + dataFile, localOffset, err := s.getDataFileAndOffset(offset) + if err != nil { + return bh, fmt.Errorf("recovery: failed to get data file for offset %d: %w", offset, err) + } + bhBuf := make([]byte, sizeOfBlockEntryHeader) + if _, err := dataFile.ReadAt(bhBuf, int64(localOffset)); err != nil { + return bh, fmt.Errorf("%w: error reading block header at offset %d: %w", ErrCorrupted, offset, err) + } + if err := bh.UnmarshalBinary(bhBuf); err != nil { + return bh, fmt.Errorf("%w: error deserializing block header at offset %d: %w", ErrCorrupted, offset, err) + } + if bh.Size == 0 { + return bh, fmt.Errorf("%w: invalid block size in header at offset %d: %d", ErrCorrupted, offset, bh.Size) + } + if bh.Version > BlockEntryVersion { + return bh, fmt.Errorf("%w: invalid block entry version at offset %d, version %d is greater than the current version %d", ErrCorrupted, offset, bh.Version, BlockEntryVersion) + } + if bh.Height < s.header.MinHeight || bh.Height == unsetHeight { + return bh, fmt.Errorf( + "%w: invalid block height in header at offset %d: found %d, expected >= %d", + ErrCorrupted, offset, bh.Height, s.header.MinHeight, + ) + } + if bh.HeaderSize > bh.Size { + return bh, fmt.Errorf("%w: invalid block header size in header at offset %d: %d > %d", ErrCorrupted, offset, bh.HeaderSize, bh.Size) + } + expectedBlockEndOffset, err := safemath.Add(offset, uint64(sizeOfBlockEntryHeader)) + if err != nil { + return bh, fmt.Errorf("calculating block end offset would overflow at offset %d: %w", offset, err) + } + expectedBlockEndOffset, err = safemath.Add(expectedBlockEndOffset, uint64(bh.Size)) + if err != nil { + return bh, fmt.Errorf("calculating block end offset would overflow at offset %d: %w", offset, err) + } + if expectedBlockEndOffset > totalDataSize { + return bh, fmt.Errorf("%w: block data out of bounds at offset %d", ErrCorrupted, offset) + } + blockData := make([]byte, bh.Size) + blockDataOffset, err := safemath.Add(localOffset, uint64(sizeOfBlockEntryHeader)) + if err != nil { + return bh, fmt.Errorf("calculating block data offset would overflow at offset %d: %w", offset, err) + } + if _, err := dataFile.ReadAt(blockData, int64(blockDataOffset)); err != nil { + return bh, fmt.Errorf("%w: failed to read block data at offset %d: %w", ErrCorrupted, offset, err) + } + calculatedChecksum := calculateChecksum(blockData) + if calculatedChecksum != bh.Checksum { + return bh, fmt.Errorf("%w: checksum mismatch for block at offset %d", ErrCorrupted, offset) + } + + // Write index entry for this block + indexFileOffset, idxErr := s.indexEntryOffset(bh.Height) + if idxErr != nil { + return bh, fmt.Errorf("cannot get index offset for recovered block %d: %w", bh.Height, idxErr) + } + if err := s.writeIndexEntryAt(indexFileOffset, offset, bh.Size, bh.HeaderSize); err != nil { + return bh, fmt.Errorf("failed to update index for recovered block %d: %w", bh.Height, err) + } + return bh, nil +} + +func (s *Database) listDataFiles() (map[int]string, int, error) { + files, err := os.ReadDir(s.config.DataDir) + if err != nil { + return nil, -1, fmt.Errorf("failed to read data directory %s: %w", s.config.DataDir, err) + } + + dataFiles := make(map[int]string) + maxIndex := -1 + for _, file := range files { + if file.IsDir() { + continue + } + var index int + n, err := fmt.Sscanf(file.Name(), dataFileNameFormat, &index) + if err != nil || n != 1 { + s.log.Debug("non-data file found in data directory", zap.String("fileName", file.Name()), zap.Error(err)) + continue + } + dataFiles[index] = filepath.Join(s.config.DataDir, file.Name()) + if index > maxIndex { + maxIndex = index + } + } + + return dataFiles, maxIndex, nil +} + +func (s *Database) openAndInitializeIndex() error { + indexPath := filepath.Join(s.config.IndexDir, indexFileName) + if err := os.MkdirAll(s.config.IndexDir, 0o755); err != nil { + return fmt.Errorf("failed to create index directory %s: %w", s.config.IndexDir, err) + } + openFlags := os.O_RDWR | os.O_CREATE + if s.config.Truncate { + openFlags |= os.O_TRUNC + } + var err error + s.indexFile, err = os.OpenFile(indexPath, openFlags, defaultFilePermissions) + if err != nil { + return fmt.Errorf("failed to open index file %s: %w", indexPath, err) + } + return s.loadOrInitializeHeader() +} + +func (s *Database) initializeDataFiles() error { + if err := os.MkdirAll(s.config.DataDir, 0o755); err != nil { + return fmt.Errorf("failed to create data directory %s: %w", s.config.DataDir, err) + } + + if s.config.Truncate { + dataFiles, _, err := s.listDataFiles() + if err != nil { + return fmt.Errorf("failed to list data files for truncation: %w", err) + } + for _, filePath := range dataFiles { + if err := os.Remove(filePath); err != nil { + return fmt.Errorf("failed to remove old data file %s: %w", filePath, err) + } + } + } + + // Pre-load the data file for the next write offset. + nextOffset := s.nextDataWriteOffset.Load() + if nextOffset > 0 { + _, _, err := s.getDataFileAndOffset(nextOffset) + if err != nil { + return fmt.Errorf("failed to pre-load data file for offset %d: %w", nextOffset, err) + } + } + return nil +} + +func (s *Database) loadOrInitializeHeader() error { + fileInfo, err := s.indexFile.Stat() + if err != nil { + return fmt.Errorf("failed to get index file stats: %w", err) + } + + // reset index file if its empty or we are truncating + if s.config.Truncate || fileInfo.Size() == 0 { + s.header = indexFileHeader{ + Version: IndexFileVersion, + MinHeight: s.config.MinimumHeight, + MaxDataFileSize: s.config.MaxDataFileSize, + MaxHeight: unsetHeight, + MaxContiguousHeight: unsetHeight, + NextWriteOffset: 0, + } + s.setBlockHeights(unsetHeight, unsetHeight) + + headerBytes, err := s.header.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to serialize new header: %w", err) + } + if uint64(len(headerBytes)) != sizeOfIndexFileHeader { + return fmt.Errorf("internal error: serialized new header size %d, expected %d", len(headerBytes), sizeOfIndexFileHeader) + } + if _, err := s.indexFile.WriteAt(headerBytes, 0); err != nil { + return fmt.Errorf("failed to write initial index header: %w", err) + } + + return nil + } + + headerBuf := make([]byte, sizeOfIndexFileHeader) + _, readErr := s.indexFile.ReadAt(headerBuf, 0) + if readErr != nil { + return fmt.Errorf("failed to read index header (delete index file to reindex): %w", readErr) + } + if err := s.header.UnmarshalBinary(headerBuf); err != nil { + return fmt.Errorf("failed to deserialize index header (delete index file to reindex): %w", err) + } + if s.header.Version != IndexFileVersion { + return fmt.Errorf("mismatched index file version: found %d, expected %d", s.header.Version, IndexFileVersion) + } + s.nextDataWriteOffset.Store(s.header.NextWriteOffset) + s.setBlockHeights(s.header.MaxHeight, s.header.MaxContiguousHeight) + + return nil +} + +func (s *Database) closeFiles() { + if s.indexFile != nil { + s.indexFile.Close() + } + if s.fileCache != nil { + s.fileCache.Flush() + } +} + +func (s *Database) dataFilePath(index int) string { + return filepath.Join(s.config.DataDir, fmt.Sprintf(dataFileNameFormat, index)) +} + +func (s *Database) getOrOpenDataFile(fileIndex int) (*os.File, error) { + if handle, ok := s.fileCache.Get(fileIndex); ok { + return handle, nil + } + + // Prevent race conditions when multiple threads try to open the same file + s.fileOpenMu.Lock() + defer s.fileOpenMu.Unlock() + + // Double-check the cache after acquiring the lock + if handle, ok := s.fileCache.Get(fileIndex); ok { + return handle, nil + } + + filePath := s.dataFilePath(fileIndex) + handle, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, defaultFilePermissions) + if err != nil { + s.log.Error("Failed to open data file", + zap.Int("fileIndex", fileIndex), + zap.String("filePath", filePath), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to open data file %s: %w", filePath, err) + } + s.fileCache.Put(fileIndex, handle) + + s.log.Debug("Opened data file", + zap.Int("fileIndex", fileIndex), + zap.String("filePath", filePath), + ) + + return handle, nil +} + +func calculateChecksum(data []byte) uint64 { + return xxhash.Sum64(data) +} + +func (s *Database) writeBlockAt(offset uint64, bh blockEntryHeader, block BlockData) error { + headerBytes, err := bh.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to serialize block header: %w", err) + } + + dataFile, localOffset, err := s.getDataFileAndOffset(offset) + if err != nil { + return fmt.Errorf("failed to get data file for writing block %d: %w", bh.Height, err) + } + + // Allocate combined buffer for header and block data and write it to the data file + combinedBufSize, err := safemath.Add(uint64(sizeOfBlockEntryHeader), uint64(len(block))) + if err != nil { + return fmt.Errorf("calculating combined buffer size would overflow for block %d: %w", bh.Height, err) + } + combinedBuf := make([]byte, combinedBufSize) + copy(combinedBuf, headerBytes) + copy(combinedBuf[sizeOfBlockEntryHeader:], block) + if _, err := dataFile.WriteAt(combinedBuf, int64(localOffset)); err != nil { + return fmt.Errorf("failed to write block to data file at offset %d: %w", offset, err) + } + + if s.config.SyncToDisk { + if err := dataFile.Sync(); err != nil { + return fmt.Errorf("failed to sync data file after writing block %d: %w", bh.Height, err) + } + } + return nil +} + +func (s *Database) updateBlockHeights(writtenBlockHeight BlockHeight) error { + s.updateBlockHeightsAtomically(func(current *blockHeights) *blockHeights { + if current == nil { + heights := &blockHeights{ + maxBlockHeight: writtenBlockHeight, + maxContiguousHeight: unsetHeight, + } + if writtenBlockHeight == s.header.MinHeight { + heights.maxContiguousHeight = writtenBlockHeight + } + return heights + } + + updated := &blockHeights{ + maxBlockHeight: current.maxBlockHeight, + maxContiguousHeight: current.maxContiguousHeight, + } + + // Update max block height if needed + if writtenBlockHeight > current.maxBlockHeight || current.maxBlockHeight == unsetHeight { + updated.maxBlockHeight = writtenBlockHeight + } + + // Update max contiguous height logic + prevContiguousCandidate := uint64(unsetHeight) + if writtenBlockHeight > s.header.MinHeight { + prevContiguousCandidate = writtenBlockHeight - 1 + } + + if current.maxContiguousHeight == prevContiguousCandidate { + // We can extend the contiguous sequence. Try to extend it further + // by checking if the next height is also available, which would repair gaps in the sequence. + currentMax := writtenBlockHeight + for { + nextHeightToVerify, err := safemath.Add(currentMax, 1) + if err != nil { + s.log.Error("Failed to update block heights: overflow in height calculation", + zap.Uint64("currentMax", currentMax), + zap.Error(err), + ) + break + } + // Check if we have indexed a block at the next height, which would extend our contiguous sequence + _, err = s.readIndexEntry(nextHeightToVerify) + if err != nil { + // If no block exists at this height, we've reached the end of our contiguous sequence + if errors.Is(err, ErrBlockNotFound) { + break + } + + // log unexpected error + s.log.Error("Failed to update block heights: error reading index entry", + zap.Uint64("height", nextHeightToVerify), + zap.Error(err), + ) + break + } + currentMax = nextHeightToVerify + } + updated.maxContiguousHeight = currentMax + } + + return updated + }) + + // Check if we need to persist header on checkpoint interval + if writtenBlockHeight%s.config.CheckpointInterval == 0 { + if err := s.persistIndexHeader(); err != nil { + return fmt.Errorf("block %d written, but checkpoint failed: %w", writtenBlockHeight, err) + } + } + + return nil +} + +func (s *Database) updateRecoveredBlockHeights(recoveredHeights []BlockHeight) error { + if len(recoveredHeights) == 0 { + return nil + } + + // Find the maximum block height among recovered blocks + maxRecoveredHeight := recoveredHeights[0] + for _, height := range recoveredHeights[1:] { + if height > maxRecoveredHeight { + maxRecoveredHeight = height + } + } + + // Update max block height (no CAS needed since we're single-threaded during recovery) + currentHeights := s.getBlockHeights() + currentMaxHeight := currentHeights.maxBlockHeight + if maxRecoveredHeight > currentMaxHeight || currentMaxHeight == unsetHeight { + currentMaxHeight = maxRecoveredHeight + } + + // Update max contiguous height by extending from current max contiguous height + currentMaxContiguous := currentHeights.maxContiguousHeight + nextHeightToVerify := s.header.MinHeight + if currentMaxContiguous != unsetHeight { + nextHeightToVerify = currentMaxContiguous + 1 + } + for { + _, err := s.readIndexEntry(nextHeightToVerify) + if err != nil { + // If no block exists at this height, we've reached the end of our contiguous sequence + if errors.Is(err, ErrBlockNotFound) { + break + } + + // Log unexpected error but continue + s.log.Error("Failed to update recovered block heights: error reading index entry", + zap.Uint64("height", nextHeightToVerify), + zap.Error(err), + ) + return err + } + nextHeightToVerify++ + } + s.setBlockHeights(currentMaxHeight, nextHeightToVerify-1) + + return nil +} + +// allocateBlockSpace reserves space for a block and returns the data file offset where it should be written. +// +// This function atomically reserves space by updating the nextWriteOffset and handles +// file splitting by advancing the nextWriteOffset when a data file would be exceeded. +// +// Parameters: +// - totalSize: The total size in bytes needed for the block +// +// Returns: +// - writeDataOffset: The data file offset where the block should be written +// - err: Error if allocation fails (e.g., block too large, overflow, etc.) +func (s *Database) allocateBlockSpace(totalSize uint32) (writeDataOffset uint64, err error) { + maxDataFileSize := s.header.MaxDataFileSize + + // Check if a single block would exceed the max data file size + if uint64(totalSize) > maxDataFileSize { + return 0, fmt.Errorf("%w: block of size %d exceeds max data file size of %d", ErrBlockTooLarge, totalSize, maxDataFileSize) + } + + for { + currentOffset := s.nextDataWriteOffset.Load() + + // Calculate where this block would end if written at current offset + blockEndOffset, err := safemath.Add(currentOffset, uint64(totalSize)) + if err != nil { + return 0, fmt.Errorf( + "adding block of size %d to offset %d would overflow uint64 data file pointer: %w", + totalSize, currentOffset, err, + ) + } + + // Determine the actual write offset for this block, taking into account + // data file splitting when max data file size is reached. + actualWriteOffset := currentOffset + actualBlockEndOffset := blockEndOffset + + // If we have a max file size, check if we need to start a new file + if maxDataFileSize > 0 { + currentFileIndex := int(currentOffset / maxDataFileSize) + offsetWithinCurrentFile := currentOffset % maxDataFileSize + + // Check if this block would span across file boundaries + blockEndWithinFile, err := safemath.Add(offsetWithinCurrentFile, uint64(totalSize)) + if err != nil { + return 0, fmt.Errorf( + "calculating block end within file would overflow: %w", + err, + ) + } + if blockEndWithinFile > maxDataFileSize { + // Advance the current write offset to the start of the next file since + // it would exceed the current file size. + nextFileStartOffset, err := safemath.Mul(uint64(currentFileIndex+1), maxDataFileSize) + if err != nil { + return 0, fmt.Errorf( + "calculating next file offset would overflow: %w", + err, + ) + } + actualWriteOffset = nextFileStartOffset + + // Recalculate the end offset for the block space to set the next write offset + if actualBlockEndOffset, err = safemath.Add(actualWriteOffset, uint64(totalSize)); err != nil { + return 0, fmt.Errorf( + "adding block of size %d to new file offset %d would overflow: %w", + totalSize, actualWriteOffset, err, + ) + } + } + } + + if s.nextDataWriteOffset.CompareAndSwap(currentOffset, actualBlockEndOffset) { + return actualWriteOffset, nil + } + } +} + +func (s *Database) getDataFileAndOffset(globalOffset uint64) (*os.File, uint64, error) { + maxFileSize := s.header.MaxDataFileSize + fileIndex := int(globalOffset / maxFileSize) + localOffset := globalOffset % maxFileSize + handle, err := s.getOrOpenDataFile(fileIndex) + return handle, localOffset, err +} diff --git a/x/blockdb/database_test.go b/x/blockdb/database_test.go new file mode 100644 index 000000000000..0b19cff0d576 --- /dev/null +++ b/x/blockdb/database_test.go @@ -0,0 +1,460 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "encoding" + "encoding/binary" + "errors" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "unsafe" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/cache/lru" + "github.com/ava-labs/avalanchego/utils/logging" +) + +func TestNew_Truncate(t *testing.T) { + // Create initial database + tempDir := t.TempDir() + config := DefaultConfig().WithDir(tempDir).WithTruncate(true) + db, err := New(config, logging.NoLog{}) + require.NoError(t, err) + require.NotNil(t, db) + + // Write some test data and close the database + testBlock := []byte("test block data") + require.NoError(t, db.WriteBlock(0, testBlock, 0)) + require.NoError(t, db.Close()) + + // Reopen with truncate=true and verify data is gone + db2, err := New(config, logging.NoLog{}) + require.NoError(t, err) + require.NotNil(t, db2) + defer db2.Close() + _, err = db2.ReadBlock(1) + require.ErrorIs(t, err, ErrBlockNotFound) + _, found := db2.MaxContiguousHeight() + require.False(t, found) +} + +func TestNew_NoTruncate(t *testing.T) { + tempDir := t.TempDir() + config := DefaultConfig().WithDir(tempDir).WithTruncate(true) + db, err := New(config, logging.NoLog{}) + require.NoError(t, err) + require.NotNil(t, db) + + // Write some test data and close the database + testBlock := []byte("test block data") + require.NoError(t, db.WriteBlock(1, testBlock, 5)) + readBlock, err := db.ReadBlock(1) + require.NoError(t, err) + require.Equal(t, testBlock, readBlock) + require.NoError(t, db.Close()) + + // Reopen with truncate=false and verify data is still there + config = DefaultConfig().WithDir(tempDir).WithTruncate(false) + db2, err := New(config, logging.NoLog{}) + require.NoError(t, err) + require.NotNil(t, db2) + defer db2.Close() + readBlock1, err := db2.ReadBlock(1) + require.NoError(t, err) + require.Equal(t, testBlock, readBlock1) + + // Verify we can write additional data + testBlock2 := []byte("test block data 3") + require.NoError(t, db2.WriteBlock(2, testBlock2, 0)) + readBlock2, err := db2.ReadBlock(2) + require.NoError(t, err) + require.Equal(t, testBlock2, readBlock2) +} + +func TestNew_Params(t *testing.T) { + tempDir := t.TempDir() + tests := []struct { + name string + config DatabaseConfig + wantErr error + expectClose bool + }{ + { + name: "default config", + config: DefaultConfig().WithDir(tempDir), + }, + { + name: "custom config", + config: DefaultConfig().WithDir(tempDir). + WithMinimumHeight(100). + WithMaxDataFileSize(1024 * 1024). // 1MB + WithMaxDataFiles(50). + WithCheckpointInterval(512), + }, + { + name: "empty index directory", + config: DefaultConfig().WithDataDir(tempDir), + wantErr: errors.New("IndexDir must be provided"), + }, + { + name: "empty data directory", + config: DefaultConfig().WithIndexDir(tempDir), + wantErr: errors.New("DataDir must be provided"), + }, + { + name: "different index and data directories", + config: DefaultConfig().WithIndexDir(filepath.Join(tempDir, "index")).WithDataDir(filepath.Join(tempDir, "data")), + }, + { + name: "invalid config - zero checkpoint interval", + config: DefaultConfig().WithDir(tempDir).WithCheckpointInterval(0), + wantErr: errors.New("CheckpointInterval cannot be 0"), + }, + { + name: "invalid config - zero max data files", + config: DefaultConfig().WithDir(tempDir).WithMaxDataFiles(0), + wantErr: errors.New("MaxDataFiles must be positive"), + }, + { + name: "invalid config - negative max data files", + config: DefaultConfig().WithDir(tempDir).WithMaxDataFiles(-1), + wantErr: errors.New("MaxDataFiles must be positive"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, err := New(tt.config, nil) + + if tt.wantErr != nil { + require.Equal(t, tt.wantErr.Error(), err.Error()) + return + } + + require.NoError(t, err) + require.NotNil(t, db) + + // Verify the database was created with correct configuration + require.Equal(t, tt.config.MinimumHeight, db.config.MinimumHeight) + require.Equal(t, tt.config.MaxDataFileSize, db.config.MaxDataFileSize) + require.Equal(t, tt.config.MaxDataFiles, db.config.MaxDataFiles) + require.Equal(t, tt.config.CheckpointInterval, db.config.CheckpointInterval) + require.Equal(t, tt.config.SyncToDisk, db.config.SyncToDisk) + indexPath := filepath.Join(tt.config.IndexDir, indexFileName) + require.FileExists(t, indexPath) + + // Test that we can close the database + require.NoError(t, db.Close()) + }) + } +} + +func TestNew_IndexFileErrors(t *testing.T) { + tests := []struct { + name string + setup func() (string, string) + wantErrMsg string + }{ + { + name: "corrupted index file", + setup: func() (string, string) { + tempDir := t.TempDir() + indexDir := filepath.Join(tempDir, "index") + dataDir := filepath.Join(tempDir, "data") + require.NoError(t, os.MkdirAll(indexDir, 0o755)) + require.NoError(t, os.MkdirAll(dataDir, 0o755)) + + // Create a corrupted index file + indexPath := filepath.Join(indexDir, indexFileName) + corruptedData := []byte("corrupted index file data") + require.NoError(t, os.WriteFile(indexPath, corruptedData, defaultFilePermissions)) + + return indexDir, dataDir + }, + wantErrMsg: "failed to read index header", + }, + { + name: "version mismatch in existing index file", + setup: func() (string, string) { + tempDir := t.TempDir() + indexDir := filepath.Join(tempDir, "index") + dataDir := filepath.Join(tempDir, "data") + + // Create directories + require.NoError(t, os.MkdirAll(indexDir, 0o755)) + require.NoError(t, os.MkdirAll(dataDir, 0o755)) + + // Create a valid index file with wrong version + indexPath := filepath.Join(indexDir, indexFileName) + header := indexFileHeader{ + Version: 999, // Wrong version + MinHeight: 0, + MaxDataFileSize: DefaultMaxDataFileSize, + MaxHeight: unsetHeight, + MaxContiguousHeight: unsetHeight, + NextWriteOffset: 0, + } + + headerBytes, err := header.MarshalBinary() + require.NoError(t, err) + require.NoError(t, os.WriteFile(indexPath, headerBytes, defaultFilePermissions)) + + return indexDir, dataDir + }, + wantErrMsg: "mismatched index file version", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + indexDir, dataDir := tt.setup() + if indexDir == "" || dataDir == "" { + t.Skip("Setup failed, skipping test") + } + + config := DefaultConfig().WithIndexDir(indexDir).WithDataDir(dataDir) + _, err := New(config, logging.NoLog{}) + require.Contains(t, err.Error(), tt.wantErrMsg) + }) + } +} + +func TestIndexFileHeaderAlignment(t *testing.T) { + require.Equal(t, uint64(0), sizeOfIndexFileHeader%sizeOfIndexEntry, + "sizeOfIndexFileHeader (%d) is not a multiple of sizeOfIndexEntry (%d)", + sizeOfIndexFileHeader, sizeOfIndexEntry) +} + +func TestIndexEntrySizePowerOfTwo(t *testing.T) { + // Check that sizeOfIndexEntry is a power of 2 + // This is important for memory alignment and performance + require.Equal(t, uint64(0), sizeOfIndexEntry&(sizeOfIndexEntry-1), + "sizeOfIndexEntry (%d) is not a power of 2", sizeOfIndexEntry) +} + +func TestNew_IndexFileConfigPrecedence(t *testing.T) { + // set up db + tempDir := t.TempDir() + initialConfig := DefaultConfig().WithDir(tempDir).WithMinimumHeight(100).WithMaxDataFileSize(1024 * 1024) + db, err := New(initialConfig, logging.NoLog{}) + require.NoError(t, err) + require.NotNil(t, db) + + // Write a block at height 100 and close db + testBlock := []byte("test block data") + require.NoError(t, db.WriteBlock(100, testBlock, 0)) + readBlock, err := db.ReadBlock(100) + require.NoError(t, err) + require.Equal(t, testBlock, readBlock) + require.NoError(t, db.Close()) + + // Reopen with different config that has minimum height of 200 and smaller max data file size + differentConfig := DefaultConfig().WithDir(tempDir).WithMinimumHeight(200).WithMaxDataFileSize(512 * 1024) + db2, err := New(differentConfig, logging.NoLog{}) + require.NoError(t, err) + require.NotNil(t, db2) + defer db2.Close() + + // The database should still accept blocks between 100 and 200 + testBlock2 := []byte("test block data 2") + require.NoError(t, db2.WriteBlock(150, testBlock2, 0)) + readBlock2, err := db2.ReadBlock(150) + require.NoError(t, err) + require.Equal(t, testBlock2, readBlock2) + + // Verify that writing below initial minimum height fails + err = db2.WriteBlock(50, []byte("invalid block"), 0) + require.ErrorIs(t, err, ErrInvalidBlockHeight) + + // Write a large block that would exceed the new config's 512KB limit + // but should succeed because we use the original 1MB limit from index file + largeBlock := make([]byte, 768*1024) // 768KB block + require.NoError(t, db2.WriteBlock(200, largeBlock, 0)) + readLargeBlock, err := db2.ReadBlock(200) + require.NoError(t, err) + require.Equal(t, largeBlock, readLargeBlock) +} + +func TestFileCache_Eviction(t *testing.T) { + // Create a database with a small max data file size to force multiple files + // each file should have enough for 2 blocks (0.5kb * 2) + config := DefaultConfig().WithMaxDataFileSize(1024 * 1.5) + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + // Override the file cache with a smaller size to force evictions + evictionCount := atomic.Int32{} + evictionMu := sync.Mutex{} + smallCache := lru.NewCache[int, *os.File](3) // Only 3 files in cache + smallCache.SetOnEvict(func(_ int, file *os.File) { + evictionMu.Lock() + defer evictionMu.Unlock() + evictionCount.Add(1) + if file != nil { + file.Close() + } + }) + store.fileCache = smallCache + + const numBlocks = 20 // 20 blocks will create 10 files + const numGoroutines = 4 + var wg sync.WaitGroup + var writeErrors atomic.Int32 + + // Create blocks of 0.5kb each + blocks := make([][]byte, numBlocks) + for i := range blocks { + blocks[i] = fixedSizeBlock(t, 512, uint64(i)) + } + + // Each goroutine writes all block heights 0-(numBlocks-1) + for g := range numGoroutines { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + for i := range numBlocks { + height := uint64((i + goroutineID) % numBlocks) + err := store.WriteBlock(height, blocks[height], 0) + if err != nil { + writeErrors.Add(1) + } + } + }(g) + } + + wg.Wait() + + // Verify no write errors + require.Zero(t, writeErrors.Load(), "concurrent writes had errors") + + // Verify we had some evictions + require.Positive(t, evictionCount.Load(), "should have had some cache evictions") + + // Verify all blocks are readable + for i := range numBlocks { + block, err := store.ReadBlock(uint64(i)) + require.NoError(t, err, "failed to read block at height %d", i) + require.Equal(t, blocks[i], block, "block data mismatch at height %d", i) + } +} + +func TestMaxDataFiles_CacheLimit(t *testing.T) { + // Test that the file cache respects the MaxDataFiles limit + // Create a small cache size to test eviction behavior + config := DefaultConfig(). + WithMaxDataFiles(2). // Only allow 2 files in cache + WithMaxDataFileSize(1024) // Small file size to force multiple files + + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + // Create blocks that will span multiple data files + // Each block is ~512 bytes, so 2 blocks per file + numBlocks := 6 // This will create 3 files, more than our cache limit of 2 + + evictionCount := 0 + store.fileCache.SetOnEvict(func(_ int, f *os.File) { + evictionCount++ + if f != nil { + f.Close() + } + }) + + // Write blocks to force multiple data files + for i := range numBlocks { + block := fixedSizeBlock(t, 512, uint64(i)) + require.NoError(t, store.WriteBlock(uint64(i), block, 0)) + } + + // Verify that evictions occurred due to cache limit + require.Positive(t, evictionCount, "should have had cache evictions due to MaxDataFiles limit") + + // Verify all blocks are still readable despite evictions + for i := range numBlocks { + block, err := store.ReadBlock(uint64(i)) + require.NoError(t, err, "failed to read block at height %d after eviction", i) + require.Len(t, block, 512, "block size mismatch at height %d", i) + } +} + +// TestStructSizes verifies that our critical data structures have the expected sizes +func TestStructSizes(t *testing.T) { + tests := []struct { + name string + memorySize uintptr + binarySize int + expectedMemorySize uintptr + expectedBinarySize int + expectedMarshalSize int + expectedPadding uintptr + createInstance func() interface{} + }{ + { + name: "indexFileHeader", + memorySize: unsafe.Sizeof(indexFileHeader{}), + binarySize: binary.Size(indexFileHeader{}), + expectedMemorySize: 64, + expectedBinarySize: 64, + expectedMarshalSize: 64, + expectedPadding: 0, + createInstance: func() interface{} { return indexFileHeader{} }, + }, + { + name: "blockEntryHeader", + memorySize: unsafe.Sizeof(blockEntryHeader{}), + binarySize: binary.Size(blockEntryHeader{}), + expectedMemorySize: 32, // 6 bytes padding due to version field being 2 bytes + expectedBinarySize: 26, + expectedMarshalSize: 26, + expectedPadding: 6, + createInstance: func() interface{} { return blockEntryHeader{} }, + }, + { + name: "indexEntry", + memorySize: unsafe.Sizeof(indexEntry{}), + binarySize: binary.Size(indexEntry{}), + expectedMemorySize: 16, + expectedBinarySize: 16, + expectedMarshalSize: 16, + expectedPadding: 0, + createInstance: func() interface{} { + return indexEntry{} + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualMemorySize := tt.memorySize + require.Equal(t, tt.expectedMemorySize, actualMemorySize, + "%s has unexpected memory size: got %d bytes, expected %d bytes", + tt.name, actualMemorySize, tt.expectedMemorySize) + + binarySize := tt.binarySize + require.Equal(t, tt.expectedBinarySize, binarySize, + "%s binary size should be compact: got %d bytes, expected %d bytes", + tt.name, binarySize, tt.expectedBinarySize) + + instance := tt.createInstance() + var data []byte + var err error + + data, err = instance.(encoding.BinaryMarshaler).MarshalBinary() + require.NoError(t, err, "%s MarshalBinary should not fail", tt.name) + require.Equal(t, tt.expectedMarshalSize, len(data), + "%s MarshalBinary should produce exactly %d bytes, got %d bytes", + tt.name, tt.expectedMarshalSize, len(data)) + + padding := actualMemorySize - uintptr(binarySize) + require.Equal(t, tt.expectedPadding, padding, + "%s should have %d bytes of padding: memory=%d, binary=%d", + tt.name, tt.expectedPadding, actualMemorySize, binarySize) + }) + } +} diff --git a/x/blockdb/datasplit_test.go b/x/blockdb/datasplit_test.go new file mode 100644 index 000000000000..442dff0a255f --- /dev/null +++ b/x/blockdb/datasplit_test.go @@ -0,0 +1,86 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDataSplitting(t *testing.T) { + // Each data file should have enough space for 2 blocks + config := DefaultConfig().WithMaxDataFileSize(1024 * 2.5) + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + // create 11 blocks, 1kb each + numBlocks := 11 + blocks := make([][]byte, numBlocks) + for i := range numBlocks { + blocks[i] = fixedSizeBlock(t, 1024, uint64(i)) + require.NoError(t, store.WriteBlock(uint64(i), blocks[i], 0)) + } + + // Verify that multiple data files were created. + files, err := os.ReadDir(store.config.DataDir) + require.NoError(t, err) + var dataFileCount int + for _, file := range files { + var index int + if n, err := fmt.Sscanf(file.Name(), dataFileNameFormat, &index); n == 1 && err == nil { + dataFileCount++ + } + } + + // 6 data files should be created + require.Equal(t, 6, dataFileCount) + + // Verify all blocks are readable + for i := range numBlocks { + readBlock, err := store.ReadBlock(uint64(i)) + require.NoError(t, err) + require.Equal(t, blocks[i], readBlock) + } + + // reopen and verify all blocks are readable + require.NoError(t, store.Close()) + config = config.WithDataDir(store.config.DataDir).WithIndexDir(store.config.IndexDir) + store, err = New(config, store.log) + require.NoError(t, err) + defer store.Close() + for i := range numBlocks { + readBlock, err := store.ReadBlock(uint64(i)) + require.NoError(t, err) + require.Equal(t, blocks[i], readBlock) + } +} + +func TestDataSplitting_DeletedFile(t *testing.T) { + config := DefaultConfig().WithMaxDataFileSize(1024 * 2.5) + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + // create 5 blocks, 1kb each + numBlocks := 5 + blocks := make([][]byte, numBlocks) + for i := range numBlocks { + blocks[i] = fixedSizeBlock(t, 1024, uint64(i)) + require.NoError(t, store.WriteBlock(uint64(i), blocks[i], 0)) + } + store.Close() + + // Delete the first data file (blockdb_0.dat) + firstDataFilePath := filepath.Join(store.config.DataDir, fmt.Sprintf(dataFileNameFormat, 0)) + require.NoError(t, os.Remove(firstDataFilePath)) + + // reopen and verify the blocks + require.NoError(t, store.Close()) + config = config.WithIndexDir(store.config.IndexDir).WithDataDir(store.config.DataDir) + _, err := New(config, store.log) + require.ErrorIs(t, err, ErrCorrupted) +} diff --git a/x/blockdb/errors.go b/x/blockdb/errors.go new file mode 100644 index 000000000000..51563578c8ca --- /dev/null +++ b/x/blockdb/errors.go @@ -0,0 +1,16 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import "errors" + +var ( + ErrInvalidBlockHeight = errors.New("blockdb: invalid block height") + ErrBlockEmpty = errors.New("blockdb: block is empty") + ErrDatabaseClosed = errors.New("blockdb: database is closed") + ErrCorrupted = errors.New("blockdb: unrecoverable corruption detected") + ErrHeaderSizeTooLarge = errors.New("blockdb: header size cannot be >= block size") + ErrBlockTooLarge = errors.New("blockdb: block size too large") + ErrBlockNotFound = errors.New("blockdb: block not found") +) diff --git a/x/blockdb/helpers_test.go b/x/blockdb/helpers_test.go new file mode 100644 index 000000000000..93979b55682d --- /dev/null +++ b/x/blockdb/helpers_test.go @@ -0,0 +1,73 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "crypto/rand" + "fmt" + "math/big" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/utils/logging" +) + +func newTestDatabase(t *testing.T, opts DatabaseConfig) (*Database, func()) { + t.Helper() + dir := t.TempDir() + config := opts.WithDir(dir) + db, err := New(config, logging.NoLog{}) + require.NoError(t, err, "failed to create database") + + cleanup := func() { + db.Close() + } + return db, cleanup +} + +// randomBlock generates a random block of size 1KB-50KB. +func randomBlock(t *testing.T) []byte { + size, err := rand.Int(rand.Reader, big.NewInt(50*1024-1024+1)) + require.NoError(t, err, "failed to generate random size") + blockSize := int(size.Int64()) + 1024 // 1KB to 50KB + b := make([]byte, blockSize) + _, err = rand.Read(b) + require.NoError(t, err, "failed to fill random block") + return b +} + +// fixedSizeBlock generates a block of the specified fixed size with height information. +func fixedSizeBlock(t *testing.T, size int, height uint64) []byte { + require.Positive(t, size, "block size must be positive") + b := make([]byte, size) + + // Fill the beginning with height information for better testability + heightStr := fmt.Sprintf("block-height-%d-", height) + if len(heightStr) <= size { + copy(b, heightStr) + } + return b +} + +func checkDatabaseState(t *testing.T, db *Database, maxHeight uint64, maxContiguousHeight uint64) { + heights := db.blockHeights.Load() + if heights != nil { + require.Equal(t, maxHeight, heights.maxBlockHeight, "maxBlockHeight mismatch") + } else { + require.Equal(t, uint64(unsetHeight), maxHeight, "maxBlockHeight mismatch") + } + gotMCH, ok := db.MaxContiguousHeight() + if maxContiguousHeight != unsetHeight { + require.True(t, ok, "MaxContiguousHeight is not set, want %d", maxContiguousHeight) + require.Equal(t, maxContiguousHeight, gotMCH, "maxContiguousHeight mismatch") + } else { + require.False(t, ok) + } +} + +// Helper function to create a pointer to uint64 +func uint64Ptr(v uint64) *uint64 { + return &v +} diff --git a/x/blockdb/readblock_test.go b/x/blockdb/readblock_test.go new file mode 100644 index 000000000000..c218716e82e1 --- /dev/null +++ b/x/blockdb/readblock_test.go @@ -0,0 +1,280 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "errors" + "math" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReadOperations(t *testing.T) { + tests := []struct { + name string + readHeight uint64 + noBlock bool + config *DatabaseConfig + setup func(db *Database) + wantErr error + expectedBlock []byte + expectedHeader []byte + expectedBody []byte + skipSeed bool + }{ + { + name: "read first block", + readHeight: 0, + }, + { + name: "read max height block", + readHeight: 50, + }, + { + name: "read height with no block", + readHeight: 40, + noBlock: true, + }, + { + name: "read block higher than max height", + readHeight: 100, + noBlock: true, + }, + { + name: "read valid block with non-zero minimum height", + readHeight: 25, + config: &DatabaseConfig{ + MinimumHeight: 20, + MaxDataFileSize: DefaultMaxDataFileSize, + CheckpointInterval: 1024, + MaxDataFiles: DefaultMaxDataFileSize, + }, + }, + { + name: "database closed", + readHeight: 1, + setup: func(db *Database) { + db.Close() + }, + wantErr: ErrDatabaseClosed, + }, + { + name: "height below minimum", + readHeight: 5, + config: &DatabaseConfig{ + MinimumHeight: 10, + MaxDataFileSize: DefaultMaxDataFileSize, + CheckpointInterval: 1024, + MaxDataFiles: DefaultMaxDataFileSize, + }, + wantErr: ErrInvalidBlockHeight, + }, + { + name: "block is past max height", + readHeight: 51, + wantErr: ErrBlockNotFound, + }, + { + name: "block height is max height", + readHeight: math.MaxUint64, + wantErr: ErrBlockNotFound, + }, + { + name: "read block with no header (headerSize=0)", + readHeight: 100, + setup: func(db *Database) { + // Write a block with no header + blockData := []byte("this is all body data") + require.NoError(t, db.WriteBlock(100, blockData, 0)) + }, + expectedBlock: []byte("this is all body data"), + expectedHeader: nil, + expectedBody: []byte("this is all body data"), + skipSeed: true, + }, + { + name: "read block with minimal body (headerSize=total size-1)", + readHeight: 101, + setup: func(db *Database) { + // Write a block where header is almost the entire block + blockData := []byte("this is all header data!") + require.NoError(t, db.WriteBlock(101, blockData, BlockHeaderSize(len(blockData)-1))) + }, + expectedBlock: []byte("this is all header data!"), + expectedHeader: []byte("this is all header data"), + expectedBody: []byte("!"), + skipSeed: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := tt.config + if config == nil { + defaultConfig := DefaultConfig() + config = &defaultConfig + } + + store, cleanup := newTestDatabase(t, *config) + defer cleanup() + + // Seed database with blocks based on config (unless skipSeed is true) + seededBlocks := make(map[uint64][]byte) + if !tt.skipSeed { + minHeight := config.MinimumHeight + maxHeight := minHeight + 50 // Always write 51 blocks + gapHeight := minHeight + 40 // Gap at relative position 40 + + for i := minHeight; i <= maxHeight; i++ { + if i == gapHeight { + continue // Create gap + } + + block := randomBlock(t) + require.NoError(t, store.WriteBlock(i, block, BlockHeaderSize(i-minHeight))) + seededBlocks[i] = block + } + } + + if tt.setup != nil { + tt.setup(store) + } + + if tt.wantErr != nil { + _, err := store.ReadBlock(tt.readHeight) + require.ErrorIs(t, err, tt.wantErr) + return + } + + // Handle success cases + if tt.noBlock { + _, err := store.ReadBlock(tt.readHeight) + require.ErrorIs(t, err, ErrBlockNotFound) + _, err = store.ReadHeader(tt.readHeight) + require.ErrorIs(t, err, ErrBlockNotFound) + _, err = store.ReadBody(tt.readHeight) + require.ErrorIs(t, err, ErrBlockNotFound) + } else { + readBlock, err := store.ReadBlock(tt.readHeight) + require.NoError(t, err) + readHeader, err := store.ReadHeader(tt.readHeight) + require.NoError(t, err) + readBody, err := store.ReadBody(tt.readHeight) + require.NoError(t, err) + + require.NotNil(t, readBlock) + + // Use custom expected values if provided, otherwise use seeded blocks + if tt.expectedBlock != nil { + require.Equal(t, tt.expectedBlock, readBlock) + require.Equal(t, tt.expectedHeader, readHeader) + require.Equal(t, tt.expectedBody, readBody) + } else { + // Standard test case logic using seeded blocks + expectedBlock := seededBlocks[tt.readHeight] + headerSize := BlockHeaderSize(tt.readHeight - config.MinimumHeight) + var expectHeader []byte + if headerSize > 0 { + expectHeader = expectedBlock[:headerSize] + } + require.Equal(t, expectedBlock, readBlock) + require.Equal(t, expectHeader, readHeader) + require.Equal(t, expectedBlock[headerSize:], readBody) + } + } + }) + } +} + +func TestReadOperations_Concurrency(t *testing.T) { + store, cleanup := newTestDatabase(t, DefaultConfig()) + defer cleanup() + + // Pre-generate blocks and write them + numBlocks := 50 + blocks := make([][]byte, numBlocks) + headerSizes := make([]BlockHeaderSize, numBlocks) + gapHeights := map[uint64]bool{ + 10: true, + 20: true, + } + + for i := range numBlocks { + if gapHeights[uint64(i)] { + continue + } + + blocks[i] = randomBlock(t) + headerSizes[i] = BlockHeaderSize(i * 10) // Varying header sizes + if headerSizes[i] > BlockHeaderSize(len(blocks[i])) { + headerSizes[i] = BlockHeaderSize(len(blocks[i])) / 2 + } + + require.NoError(t, store.WriteBlock(uint64(i), blocks[i], headerSizes[i])) + } + + var wg sync.WaitGroup + var errorCount atomic.Int32 + for i := range numBlocks + 10 { + wg.Add(3) // One for each read operation + + go func(height int) { + defer wg.Done() + block, err := store.ReadBlock(uint64(height)) + if gapHeights[uint64(height)] || height >= numBlocks { + if err == nil || !errors.Is(err, ErrBlockNotFound) { + errorCount.Add(1) + } + } else { + if err != nil { + errorCount.Add(1) + return + } + require.Equal(t, blocks[height], block) + } + }(i) + + go func(height int) { + defer wg.Done() + header, err := store.ReadHeader(uint64(height)) + if gapHeights[uint64(height)] || height >= numBlocks { + if err == nil || !errors.Is(err, ErrBlockNotFound) { + errorCount.Add(1) + } + } else { + if err != nil { + errorCount.Add(1) + return + } + expectedHeader := blocks[height][:headerSizes[height]] + if headerSizes[height] == 0 { + expectedHeader = nil + } + require.Equal(t, expectedHeader, header) + } + }(i) + + go func(height int) { + defer wg.Done() + body, err := store.ReadBody(uint64(height)) + if gapHeights[uint64(height)] || height >= numBlocks { + if err == nil || !errors.Is(err, ErrBlockNotFound) { + errorCount.Add(1) + } + } else { + if err != nil { + errorCount.Add(1) + return + } + expectedBody := blocks[height][headerSizes[height]:] + require.Equal(t, expectedBody, body) + } + }(i) + } + wg.Wait() + require.Zero(t, errorCount.Load(), "concurrent read operations had errors") +} diff --git a/x/blockdb/recovery_test.go b/x/blockdb/recovery_test.go new file mode 100644 index 000000000000..74a2543ac22c --- /dev/null +++ b/x/blockdb/recovery_test.go @@ -0,0 +1,509 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "math" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRecovery_Success(t *testing.T) { + // Create database with 10KB file size and 4KB blocks + // This means each file will have 2 blocks (4KB + 24 bytes header = ~4KB per block) + config := DefaultConfig().WithMaxDataFileSize(10 * 1024) // 10KB per file + + tests := []struct { + name string + corruptIndex func(indexPath string) error + }{ + { + name: "recovery from missing index file; blocks will be recovered", + corruptIndex: os.Remove, + }, + { + name: "recovery from truncated index file that only indexed the first block", + corruptIndex: func(indexPath string) error { + // Remove the existing index file + if err := os.Remove(indexPath); err != nil { + return err + } + + // Create a new index file with only the first block indexed + // This simulates an unclean shutdown where the index file is behind + indexFile, err := os.OpenFile(indexPath, os.O_RDWR|os.O_CREATE, defaultFilePermissions) + if err != nil { + return err + } + defer indexFile.Close() + + // Create a header that only knows about the first block + // Block 0: 4KB data + header + firstBlockOffset := uint64(sizeOfBlockEntryHeader) + 4*1024 + + header := indexFileHeader{ + Version: IndexFileVersion, + MaxDataFileSize: 4 * 10 * 1024, // 10KB per file + MinHeight: 0, + MaxContiguousHeight: 0, + MaxHeight: 0, + NextWriteOffset: firstBlockOffset, + } + + // Write the header + headerBytes, err := header.MarshalBinary() + if err != nil { + return err + } + if _, err := indexFile.WriteAt(headerBytes, 0); err != nil { + return err + } + + // Write index entry for only the first block + indexEntry := indexEntry{ + Offset: 0, + Size: 4 * 1024, // 4KB + HeaderSize: 0, + } + entryBytes, err := indexEntry.MarshalBinary() + if err != nil { + return err + } + indexEntryOffset := sizeOfIndexFileHeader + if _, err := indexFile.WriteAt(entryBytes, int64(indexEntryOffset)); err != nil { + return err + } + + return nil + }, + }, + { + name: "recovery from index file that is behind by one block", + corruptIndex: func(indexPath string) error { + // Read the current index file to get the header + indexFile, err := os.OpenFile(indexPath, os.O_RDWR, 0) + if err != nil { + return err + } + defer indexFile.Close() + + // Read the current header + headerBuf := make([]byte, sizeOfIndexFileHeader) + _, err = indexFile.ReadAt(headerBuf, 0) + if err != nil { + return err + } + + // Parse the header + var header indexFileHeader + err = header.UnmarshalBinary(headerBuf) + if err != nil { + return err + } + + // Corrupt the header by setting the NextWriteOffset to be one block behind + blockSize := uint64(sizeOfBlockEntryHeader) + 4*1024 + header.NextWriteOffset = header.NextWriteOffset - blockSize + header.MaxContiguousHeight = 3 + header.MaxHeight = 8 + + // Write the corrupted header back + corruptedHeaderBytes, err := header.MarshalBinary() + if err != nil { + return err + } + _, err = indexFile.WriteAt(corruptedHeaderBytes, 0) + return err + }, + }, + { + name: "recovery from inconsistent index header (offset lagging behind heights)", + corruptIndex: func(indexPath string) error { + // Read the current index file to get the header + indexFile, err := os.OpenFile(indexPath, os.O_RDWR, 0) + if err != nil { + return err + } + defer indexFile.Close() + + // Read the current header + headerBuf := make([]byte, sizeOfIndexFileHeader) + _, err = indexFile.ReadAt(headerBuf, 0) + if err != nil { + return err + } + + // Parse the header + var header indexFileHeader + err = header.UnmarshalBinary(headerBuf) + if err != nil { + return err + } + + // Calculate the offset after the 5th block (assuming 4KB blocks) + // 2 files, 10KB each, 4KB block size + blockSize := uint64(sizeOfBlockEntryHeader) + 4*1024 + header.NextWriteOffset = 10*1024*2 + blockSize + + // Write the corrupted header back + corruptedHeaderBytes, err := header.MarshalBinary() + if err != nil { + return err + } + _, err = indexFile.WriteAt(corruptedHeaderBytes, 0) + return err + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store, _ := newTestDatabase(t, config) + + blockHeights := []uint64{0, 1, 3, 6, 2, 8, 4} + blocks := make(map[uint64][]byte) + + for _, height := range blockHeights { + // Create 4KB blocks + block := fixedSizeBlock(t, 4*1024, height) + + require.NoError(t, store.WriteBlock(height, block, 0)) + blocks[height] = block + } + checkDatabaseState(t, store, 8, 4) + require.NoError(t, store.Close()) + + // Corrupt the index file according to the test case + indexPath := store.indexFile.Name() + require.NoError(t, tt.corruptIndex(indexPath)) + + // Reopen the database and test recovery + recoveredStore, err := New(config.WithIndexDir(store.config.IndexDir).WithDataDir(store.config.DataDir), store.log) + require.NoError(t, err) + defer recoveredStore.Close() + + // Verify blocks are readable + for _, height := range blockHeights { + readBlock, err := recoveredStore.ReadBlock(height) + require.NoError(t, err) + require.Equal(t, blocks[height], readBlock, "block %d should be the same", height) + } + checkDatabaseState(t, recoveredStore, 8, 4) + }) + } +} + +func TestRecovery_CorruptionDetection(t *testing.T) { + tests := []struct { + name string + blockHeights []uint64 + minHeight uint64 + maxDataFileSize *uint64 + blockSize int // Optional: if set, creates fixed-size blocks instead of random + setupCorruption func(store *Database, blocks [][]byte) error + wantErr error + wantErrText string + }{ + { + name: "index header claims larger offset than actual data", + blockHeights: []uint64{0, 1, 2, 3, 4}, + setupCorruption: func(store *Database, _ [][]byte) error { + indexPath := store.indexFile.Name() + indexFile, err := os.OpenFile(indexPath, os.O_RDWR, 0) + if err != nil { + return err + } + defer indexFile.Close() + + // Read the current header + headerBuf := make([]byte, sizeOfIndexFileHeader) + _, err = indexFile.ReadAt(headerBuf, 0) + if err != nil { + return err + } + + // Parse and corrupt the header by setting NextWriteOffset to be much larger than actual data + var header indexFileHeader + err = header.UnmarshalBinary(headerBuf) + if err != nil { + return err + } + header.NextWriteOffset = 1000000 + + // Write the corrupted header back + corruptedHeaderBytes, err := header.MarshalBinary() + if err != nil { + return err + } + _, err = indexFile.WriteAt(corruptedHeaderBytes, 0) + return err + }, + wantErr: ErrCorrupted, + wantErrText: "index header claims to have more data than is actually on disk", + }, + { + name: "corrupted block header in data file", + blockHeights: []uint64{0, 1, 3}, + setupCorruption: func(store *Database, blocks [][]byte) error { + if err := resetIndexToBlock(store, uint64(len(blocks[0])), 0); err != nil { + return err + } + // Corrupt second block header with invalid data + secondBlockOffset := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0])) + corruptedHeader := make([]byte, sizeOfBlockEntryHeader) + for i := range corruptedHeader { + corruptedHeader[i] = 0xFF // Invalid header data + } + dataFilePath := store.dataFilePath(0) + dataFile, err := os.OpenFile(dataFilePath, os.O_RDWR, 0) + if err != nil { + return err + } + defer dataFile.Close() + _, err = dataFile.WriteAt(corruptedHeader, secondBlockOffset) + return err + }, + wantErr: ErrCorrupted, + wantErrText: "invalid block entry version at offset", + }, + { + name: "block with invalid block size in header that reads more than total data file size", + blockHeights: []uint64{0, 1}, + setupCorruption: func(store *Database, blocks [][]byte) error { + if err := resetIndexToBlock(store, uint64(len(blocks[0])), 0); err != nil { + return err + } + secondBlockOffset := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0])) + bh := blockEntryHeader{ + Height: 1, + Checksum: calculateChecksum(blocks[1]), + Size: uint32(len(blocks[1])) + 1, // make block larger than actual + HeaderSize: 0, + Version: BlockEntryVersion, + } + return writeBlockHeader(store, secondBlockOffset, bh) + }, + wantErr: ErrCorrupted, + wantErrText: "block data out of bounds at offset ", + }, + { + name: "block with checksum mismatch", + blockHeights: []uint64{0, 1}, + setupCorruption: func(store *Database, blocks [][]byte) error { + if err := resetIndexToBlock(store, uint64(len(blocks[0])), 0); err != nil { + return err + } + secondBlockOffset := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0])) + bh := blockEntryHeader{ + Height: 1, + Checksum: 0xDEADBEEF, // Wrong checksum + Size: uint32(len(blocks[1])), + HeaderSize: 0, + Version: BlockEntryVersion, + } + return writeBlockHeader(store, secondBlockOffset, bh) + }, + wantErr: ErrCorrupted, + wantErrText: "checksum mismatch for block", + }, + { + name: "partial block at end of file", + blockHeights: []uint64{0}, + setupCorruption: func(store *Database, blocks [][]byte) error { + dataFilePath := store.dataFilePath(0) + dataFile, err := os.OpenFile(dataFilePath, os.O_RDWR, 0) + if err != nil { + return err + } + defer dataFile.Close() + + // Truncate data file to have only partial block data + truncateSize := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0]))/2 + return dataFile.Truncate(truncateSize) + }, + wantErr: ErrCorrupted, + wantErrText: "index header claims to have more data than is actually on disk", + }, + { + name: "block with invalid height", + blockHeights: []uint64{10, 11}, + minHeight: 10, + setupCorruption: func(store *Database, blocks [][]byte) error { + if err := resetIndexToBlock(store, uint64(len(blocks[0])), 10); err != nil { + return err + } + secondBlockOffset := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0])) + bh := blockEntryHeader{ + Height: 5, // Invalid height because its below the minimum height of 10 + Checksum: calculateChecksum(blocks[1]), + Size: uint32(len(blocks[1])), + HeaderSize: 0, + Version: BlockEntryVersion, + } + return writeBlockHeader(store, secondBlockOffset, bh) + }, + wantErr: ErrCorrupted, + wantErrText: "invalid block height in header", + }, + { + name: "missing data file at index 1", + blockHeights: []uint64{0, 1, 2, 3, 4, 5}, + maxDataFileSize: uint64Ptr(1024), // 1KB per file to force multiple files + blockSize: 512, // 512 bytes per block + setupCorruption: func(store *Database, _ [][]byte) error { + // Delete the second data file (index 1) + dataFilePath := store.dataFilePath(1) + return os.Remove(dataFilePath) + }, + wantErr: ErrCorrupted, + wantErrText: "data file at index 1 is missing", + }, + { + name: "unexpected multiple data files when MaxDataFileSize is max uint64", + blockHeights: []uint64{0, 1, 2}, + maxDataFileSize: uint64Ptr(math.MaxUint64), // Single file mode + blockSize: 512, // 512 bytes per block + setupCorruption: func(store *Database, _ [][]byte) error { + // Manually create a second data file to simulate corruption + secondDataFilePath := store.dataFilePath(1) + secondDataFile, err := os.Create(secondDataFilePath) + if err != nil { + return err + } + defer secondDataFile.Close() + + // Write some dummy data to the second file + dummyData := []byte("dummy data file") + _, err = secondDataFile.Write(dummyData) + return err + }, + wantErr: ErrCorrupted, + wantErrText: "only one data file expected when MaxDataFileSize is max uint64, got 2 files with max index 1", + }, + { + name: "block with invalid block entry version", + blockHeights: []uint64{0, 1}, + setupCorruption: func(store *Database, blocks [][]byte) error { + if err := resetIndexToBlock(store, uint64(len(blocks[0])), 0); err != nil { + return err + } + // Corrupt second block header version + secondBlockOffset := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0])) + bh := blockEntryHeader{ + Height: 1, + Checksum: calculateChecksum(blocks[1]), + Size: uint32(len(blocks[1])), + HeaderSize: 0, + Version: BlockEntryVersion + 1, // Invalid version + } + return writeBlockHeader(store, secondBlockOffset, bh) + }, + wantErr: ErrCorrupted, + wantErrText: "invalid block entry version at offset", + }, + { + name: "second block with invalid version among 4 blocks", + blockHeights: []uint64{0, 3, 2, 4}, + setupCorruption: func(store *Database, blocks [][]byte) error { + if err := resetIndexToBlock(store, uint64(len(blocks[0])), 0); err != nil { + return err + } + // Corrupt second block header with invalid version + secondBlockOffset := int64(sizeOfBlockEntryHeader) + int64(len(blocks[0])) + bh := blockEntryHeader{ + Height: 1, + Checksum: calculateChecksum(blocks[1]), + Size: uint32(len(blocks[1])), + HeaderSize: 0, + Version: BlockEntryVersion + 10, // version cannot be greater than current + } + return writeBlockHeader(store, secondBlockOffset, bh) + }, + wantErr: ErrCorrupted, + wantErrText: "invalid block entry version at offset", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := DefaultConfig() + if tt.minHeight > 0 { + config = config.WithMinimumHeight(tt.minHeight) + } + if tt.maxDataFileSize != nil { + config = config.WithMaxDataFileSize(*tt.maxDataFileSize) + } + + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + // Setup blocks + blocks := make([][]byte, len(tt.blockHeights)) + for i, height := range tt.blockHeights { + if tt.blockSize > 0 { + blocks[i] = fixedSizeBlock(t, tt.blockSize, height) + } else { + blocks[i] = randomBlock(t) + } + require.NoError(t, store.WriteBlock(height, blocks[i], 0)) + } + require.NoError(t, store.Close()) + + // Apply corruption logic + require.NoError(t, tt.setupCorruption(store, blocks)) + + // Try to reopen the database - it should detect corruption + _, err := New(config.WithIndexDir(store.config.IndexDir).WithDataDir(store.config.DataDir), store.log) + require.ErrorIs(t, err, tt.wantErr) + require.Contains(t, err.Error(), tt.wantErrText, "error message should contain expected text") + }) + } +} + +// Helper function to reset index to only a single block +func resetIndexToBlock(store *Database, blockSize uint64, minHeight uint64) error { + indexPath := store.indexFile.Name() + indexFile, err := os.OpenFile(indexPath, os.O_RDWR, 0) + if err != nil { + return err + } + defer indexFile.Close() + + header := indexFileHeader{ + Version: IndexFileVersion, + MaxDataFileSize: DefaultMaxDataFileSize, + MinHeight: minHeight, + MaxContiguousHeight: minHeight, + MaxHeight: minHeight, + NextWriteOffset: uint64(sizeOfBlockEntryHeader) + blockSize, + } + + headerBytes, err := header.MarshalBinary() + if err != nil { + return err + } + _, err = indexFile.WriteAt(headerBytes, 0) + return err +} + +// Helper function to write a block header at a specific offset +func writeBlockHeader(store *Database, offset int64, bh blockEntryHeader) error { + fileIndex := int(offset / int64(store.header.MaxDataFileSize)) + localOffset := offset % int64(store.header.MaxDataFileSize) + dataFilePath := store.dataFilePath(fileIndex) + dataFile, err := os.OpenFile(dataFilePath, os.O_RDWR, 0) + if err != nil { + return err + } + defer dataFile.Close() + + headerBytes, err := bh.MarshalBinary() + if err != nil { + return err + } + _, err = dataFile.WriteAt(headerBytes, localOffset) + return err +} diff --git a/x/blockdb/writeblock_test.go b/x/blockdb/writeblock_test.go new file mode 100644 index 000000000000..3ce220543b9b --- /dev/null +++ b/x/blockdb/writeblock_test.go @@ -0,0 +1,354 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "math" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + + safemath "github.com/ava-labs/avalanchego/utils/math" +) + +func TestWriteBlock_Basic(t *testing.T) { + customConfig := DefaultConfig().WithMinimumHeight(10) + + tests := []struct { + name string + blockHeights []uint64 // block heights to write, in order + config DatabaseConfig + expectedMCH uint64 // expected max contiguous height + expectedMaxHeight uint64 + headerSizes []BlockHeaderSize + syncToDisk bool + checkpointInterval uint64 + }{ + { + name: "no blocks to write", + expectedMCH: unsetHeight, + expectedMaxHeight: unsetHeight, + }, + { + name: "single block at min height", + blockHeights: []uint64{0}, + expectedMCH: 0, + expectedMaxHeight: 0, + }, + { + name: "sequential blocks from min", + blockHeights: []uint64{0, 1, 2, 3}, + expectedMCH: 3, + expectedMaxHeight: 3, + }, + { + name: "out of order with no gaps", + blockHeights: []uint64{3, 1, 2, 0, 4}, + expectedMCH: 4, + expectedMaxHeight: 4, + }, + { + name: "blocks with gaps", + blockHeights: []uint64{0, 1, 3, 5, 6}, + expectedMCH: 1, + expectedMaxHeight: 6, + }, + { + name: "start with gap", + blockHeights: []uint64{5, 6}, + expectedMCH: unsetHeight, + expectedMaxHeight: 6, + }, + { + name: "overwrite same height", + blockHeights: []uint64{0, 1, 0}, // Write to height 0 twice + expectedMCH: 1, + expectedMaxHeight: 1, + }, + { + name: "custom min height single block", + blockHeights: []uint64{10}, + config: customConfig, + expectedMCH: 10, + expectedMaxHeight: 10, + }, + { + name: "custom min height out of order", + blockHeights: []uint64{13, 11, 10, 12}, + config: customConfig, + expectedMCH: 13, + expectedMaxHeight: 13, + }, + { + name: "custom min height with gaps", + blockHeights: []uint64{10, 11, 13, 15}, + config: customConfig, + expectedMCH: 11, + expectedMaxHeight: 15, + }, + { + name: "custom min height start with gap", + blockHeights: []uint64{11, 12}, + config: customConfig, + expectedMCH: unsetHeight, + expectedMaxHeight: 12, + }, + { + name: "blocks with various header sizes", + blockHeights: []uint64{0, 1, 2}, + headerSizes: []BlockHeaderSize{0, 50, 100}, + expectedMCH: 2, + expectedMaxHeight: 2, + }, + { + name: "overwrite with different header size", + blockHeights: []uint64{12, 13, 12}, // Write twice to same height + headerSizes: []BlockHeaderSize{10, 0, 50}, + expectedMCH: unsetHeight, + expectedMaxHeight: 13, + }, + { + name: "with sync to disk", + blockHeights: []uint64{0, 1, 2, 5}, + syncToDisk: true, + expectedMCH: 2, + expectedMaxHeight: 5, + }, + { + name: "custom checkpoint interval", + blockHeights: []uint64{0, 1, 2, 3, 4}, + checkpointInterval: 2, + expectedMCH: 4, + expectedMaxHeight: 4, + }, + { + name: "complicated gaps", + blockHeights: []uint64{ + 10, 3, 2, 9, 35, 34, 30, 1, 9, 88, 83, 4, 43, 5, 0, + }, + expectedMCH: 5, + expectedMaxHeight: 88, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := tt.config + if config.CheckpointInterval == 0 { + config = DefaultConfig() + } + + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + blocksWritten := make(map[uint64][]byte) + headerSizesWritten := make(map[uint64]BlockHeaderSize) + for i, h := range tt.blockHeights { + block := randomBlock(t) + var headerSize BlockHeaderSize + + // Use specific header size if provided + if tt.headerSizes != nil && i < len(tt.headerSizes) { + headerSize = tt.headerSizes[i] + // Ensure header size doesn't exceed block size + require.LessOrEqual(t, int(headerSize), len(block), "header size %d exceeds block size %d for test case", headerSize, len(block)) + } + + err := store.WriteBlock(h, block, headerSize) + require.NoError(t, err, "unexpected error at height %d", h) + + blocksWritten[h] = block + headerSizesWritten[h] = headerSize // Store the header size for the final write to this height + } + + // Verify all written blocks are readable and data is correct + for h, expectedBlock := range blocksWritten { + readBlock, err := store.ReadBlock(h) + require.NoError(t, err, "ReadBlock failed at height %d", h) + require.Equal(t, expectedBlock, readBlock) + + // Test header/body separation if header size was specified + if tt.headerSizes != nil { + if headerSize, exists := headerSizesWritten[h]; exists { + header, err := store.ReadHeader(h) + require.NoError(t, err, "ReadHeader failed at height %d", h) + + body, err := store.ReadBody(h) + require.NoError(t, err, "ReadBody failed at height %d", h) + + if headerSize == 0 { + require.Nil(t, header) + require.Equal(t, expectedBlock, body) + } else { + expectedHeader := expectedBlock[:headerSize] + expectedBody := expectedBlock[headerSize:] + require.Equal(t, expectedHeader, header, "header mismatch at height %d", h) + require.Equal(t, expectedBody, body, "body mismatch at height %d", h) + } + } + } + } + + checkDatabaseState(t, store, tt.expectedMaxHeight, tt.expectedMCH) + }) + } +} + +func TestWriteBlock_Concurrency(t *testing.T) { + store, cleanup := newTestDatabase(t, DefaultConfig()) + defer cleanup() + + var wg sync.WaitGroup + var errors atomic.Int32 + + // Pre-generate blocks for reuse + blocks := make([][]byte, 20) + for i := range 20 { + blocks[i] = randomBlock(t) + wg.Add(1) + go func(i int) { + defer wg.Done() + var height uint64 + block := blocks[i] + + // create gaps at heights 5 and 10 and rewrite last block + if i == 5 || i == 10 { + height = uint64(i - 1) + block = blocks[i-1] + } else { + height = uint64(i) + } + + err := store.WriteBlock(height, block, 1) + if err != nil { + require.NoError(t, err, "WriteBlock failed for iteration %d (height %d)", i, height) + errors.Add(1) + } + }(i) + } + + wg.Wait() + require.Zero(t, errors.Load(), "concurrent writes had errors") + + // Verify that all expected heights have blocks (except 5, 10) + for i := range 20 { + height := uint64(i) + block, err := store.ReadBlock(height) + if i == 5 || i == 10 { + require.ErrorIs(t, err, ErrBlockNotFound, "expected ErrBlockNotFound at gap height %d", height) + } else { + require.NoError(t, err) + require.Equal(t, blocks[i], block, "block mismatch at height %d", height) + } + } + checkDatabaseState(t, store, 19, 4) +} + +func TestWriteBlock_Errors(t *testing.T) { + tests := []struct { + name string + height uint64 + block []byte + headerSize BlockHeaderSize + setup func(db *Database) + config DatabaseConfig + wantErr error + }{ + { + name: "empty block nil", + height: 0, + block: nil, + headerSize: 0, + wantErr: ErrBlockEmpty, + }, + { + name: "empty block zero length", + height: 0, + block: []byte{}, + headerSize: 0, + wantErr: ErrBlockEmpty, + }, + { + name: "header size larger than block", + height: 0, + block: []byte("small"), + headerSize: 6, // block is only 5 bytes + wantErr: ErrHeaderSizeTooLarge, + }, + { + name: "header size equal to block", + height: 0, + block: []byte("small"), + headerSize: 5, + wantErr: ErrHeaderSizeTooLarge, + }, + { + name: "height below custom minimum", + height: 5, + block: randomBlock(t), + config: DefaultConfig().WithMinimumHeight(10), + headerSize: 0, + wantErr: ErrInvalidBlockHeight, + }, + { + name: "height causes overflow", + height: math.MaxUint64, + block: randomBlock(t), + headerSize: 0, + wantErr: ErrInvalidBlockHeight, + }, + { + name: "database closed", + height: 0, + block: randomBlock(t), + headerSize: 0, + setup: func(db *Database) { + db.Close() + }, + wantErr: ErrDatabaseClosed, + }, + { + name: "exceed max data file size", + height: 0, + block: make([]byte, 999), // Block + header will exceed 1024 limit (999 + 26 = 1025 > 1024) + config: DefaultConfig().WithMaxDataFileSize(1024), + headerSize: 0, + wantErr: ErrBlockTooLarge, + }, + { + name: "data file offset overflow", + height: 0, + block: make([]byte, 100), + config: DefaultConfig(), + setup: func(db *Database) { + // Set the next write offset to near max to trigger overflow + db.nextDataWriteOffset.Store(math.MaxUint64 - 50) + }, + headerSize: 0, + wantErr: safemath.ErrOverflow, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := tt.config + if config.CheckpointInterval == 0 { + config = DefaultConfig() + } + + store, cleanup := newTestDatabase(t, config) + defer cleanup() + + if tt.setup != nil { + tt.setup(store) + } + + err := store.WriteBlock(tt.height, tt.block, tt.headerSize) + require.ErrorIs(t, err, tt.wantErr) + checkDatabaseState(t, store, unsetHeight, unsetHeight) + }) + } +}