Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update disk root more frequently #29565

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 22 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type CacheConfig struct {
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top

AllowForceUpdate bool // Enable update disk root when commit threshold is reached. Disabled by default
CommitThreshold int // Number of commits threshold after which to flush the layers to disk

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}
Expand All @@ -157,9 +160,11 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
AllowForceUpdate: c.AllowForceUpdate,
CommitThreshold: c.CommitThreshold,
}
}
return config
Expand All @@ -168,12 +173,14 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
AllowForceUpdate: false,
CommitThreshold: 128,
}

// DefaultCacheConfigWithScheme returns a deep copied default cache config with
Expand Down Expand Up @@ -441,10 +448,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
AllowForceUpdate: bc.cacheConfig.AllowForceUpdate,
CommitThreshold: bc.cacheConfig.CommitThreshold,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
}
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
Expand Down Expand Up @@ -1950,7 +1950,7 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
if err := chain.snaps.Cap(blocks[1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
chain.snaps.Cap(blocks[point-1].Root(), 0)
chain.snaps.Cap(blocks[point-1].Root(), 0, false)
diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot)
Expand Down
2 changes: 1 addition & 1 deletion core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta
// Pruning is done, now drop the "useless" layers from the snapshot.
// Firstly, flushing the target layer into the disk. After that all
// diff layers below the target will all be merged into the disk.
if err := snaptree.Cap(root, 0); err != nil {
if err := snaptree.Cap(root, 0, false); err != nil {
return err
}
// Secondly, flushing the snapshot journal into the disk. All diff
Expand Down
5 changes: 5 additions & 0 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ var (
bloomDestructHasherOffset = 0
bloomAccountHasherOffset = 0
bloomStorageHasherOffset = 0

// Count for number of commits before force disk update
// after the first 128 layers, the 129 layers would be committed
// to disk.
defaultCommitThreshold = 128
)

func init() {
Expand Down
8 changes: 4 additions & 4 deletions core/state/snapshot/disklayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestDiskMerge(t *testing.T) {
}); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
// Retrieve all the data through the disk layer and validate it
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestDiskPartialMerge(t *testing.T) {
}); err != nil {
t.Fatalf("test %d: failed to update snapshot tree: %v", i, err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("test %d: failed to flatten snapshot tree: %v", i, err)
}
// Retrieve all the data through the disk layer and validate it
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
}, nil); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob := rawdb.ReadSnapshotGenerator(db)
Expand All @@ -489,7 +489,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
}
diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer)
diskLayer.genMarker = nil // Construction finished
if err := snaps.Cap(diffTwoRoot, 0); err != nil {
if err := snaps.Cap(diffTwoRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob = rawdb.ReadSnapshotGenerator(db)
Expand Down
12 changes: 6 additions & 6 deletions core/state/snapshot/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestAccountIteratorTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x04"), 2)
snaps.Cap(common.HexToHash("0x04"), 2, false)
verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)

it, _ = snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestStorageIteratorTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x04"), 2)
snaps.Cap(common.HexToHash("0x04"), 2, false)
verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage)

it, _ = snaps.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x09"), 2)
snaps.Cap(common.HexToHash("0x09"), 2, false)

it, _ = snaps.AccountIterator(common.HexToHash("0x09"), common.Hash{})
for it.Next() {
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x09"), 2)
snaps.Cap(common.HexToHash("0x09"), 2, false)

it, _ = snaps.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x80"), 2)
snaps.Cap(common.HexToHash("0x80"), 2, false)

verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)

Expand Down Expand Up @@ -580,7 +580,7 @@ func TestAccountIteratorFlattening(t *testing.T) {
it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
defer it.Release()

if err := snaps.Cap(common.HexToHash("0x04"), 1); err != nil {
if err := snaps.Cap(common.HexToHash("0x04"), 1, false); err != nil {
t.Fatalf("failed to flatten snapshot stack: %v", err)
}
//verifyIterator(t, 7, it)
Expand Down
63 changes: 48 additions & 15 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,24 @@ type snapshot interface {

// Config includes the configurations for snapshots.
type Config struct {
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
NoBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
NoBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
AllowForceUpdate bool // Enable forcing snap root generation on a commit count
CommitThreshold int // Number of commit after which to attempt snap root update
}

// sanitize checks the provided user configurations and changes anything that's
// unreasonable or unworkable.
func (c *Config) sanitize() Config {
conf := *c

if conf.CommitThreshold == 0 {
log.Warn("Sanitizing commit threshold", "provided", conf.CommitThreshold, "updated", defaultCommitThreshold)
conf.CommitThreshold = defaultCommitThreshold
}
return conf
}

// Tree is an Ethereum state snapshot tree. It consists of one persistent base
Expand All @@ -166,11 +180,12 @@ type Config struct {
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
// cheap iteration of the account/storage tries for sync aid.
type Tree struct {
config Config // Snapshots configurations
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *triedb.Database // In-memory cache to access the trie through
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex
config Config // Snapshots configurations
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *triedb.Database // In-memory cache to access the trie through
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex
commitCounter int // Counter for number of commits

// Test hooks
onFlatten func() // Hook invoked when the bottom most diff layers are flattened
Expand All @@ -195,11 +210,12 @@ type Tree struct {
func New(config Config, diskdb ethdb.KeyValueStore, triedb *triedb.Database, root common.Hash) (*Tree, error) {
// Create a new, empty snapshot tree
snap := &Tree{
config: config,
config: config.sanitize(),
diskdb: diskdb,
triedb: triedb,
layers: make(map[common.Hash]snapshot),
}

// Attempt to load a previously persisted snapshot and rebuild one if failed
head, disabled, err := loadSnapshot(diskdb, triedb, root, config.CacheSize, config.Recovery, config.NoBuild)
if disabled {
Expand Down Expand Up @@ -382,7 +398,7 @@ func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs m
// which may or may not overflow and cascade to disk. Since this last layer's
// survival is only known *after* capping, we need to omit it from the count if
// we want to ensure that *at least* the requested number of diff layers remain.
func (t *Tree) Cap(root common.Hash, layers int) error {
func (t *Tree) Cap(root common.Hash, layers int, force bool) error {
// Retrieve the head snapshot to cap from
snap := t.Snapshot(root)
if snap == nil {
Expand Down Expand Up @@ -416,7 +432,7 @@ func (t *Tree) Cap(root common.Hash, layers int) error {
t.layers = map[common.Hash]snapshot{base.root: base}
return nil
}
persisted := t.cap(diff, layers)
persisted := t.cap(diff, layers, force)

// Remove any layer that is stale or links into a stale layer
children := make(map[common.Hash][]common.Hash)
Expand Down Expand Up @@ -466,7 +482,7 @@ func (t *Tree) Cap(root common.Hash, layers int) error {
// which may or may not overflow and cascade to disk. Since this last layer's
// survival is only known *after* capping, we need to omit it from the count if
// we want to ensure that *at least* the requested number of diff layers remain.
func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
func (t *Tree) cap(diff *diffLayer, layers int, force bool) *diskLayer {
// Dive until we run out of layers or reach the persistent database
for i := 0; i < layers-1; i++ {
// If we still have diff layers below, continue down
Expand Down Expand Up @@ -500,7 +516,7 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
t.onFlatten()
}
diff.parent = flattened
if flattened.memory < aggregatorMemoryLimit {
if (flattened.memory < aggregatorMemoryLimit) && !force {
// Accumulator layer is smaller than the limit, so we can abort, unless
// there's a snapshot being generated currently. In that case, the trie
// will move from underneath the generator so we **must** merge all the
Expand All @@ -512,7 +528,7 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
default:
panic(fmt.Sprintf("unknown data layer: %T", parent))
}
// If the bottom-most layer is larger than our memory cap, persist to disk
// If the bottom-most layer is larger than our memory cap or time elapsed greater than threshold, persist to disk
bottom := diff.parent.(*diffLayer)

bottom.lock.RLock()
Expand Down Expand Up @@ -885,3 +901,20 @@ func (t *Tree) Size() (diffs common.StorageSize, buf common.StorageSize) {
}
return size, 0
}

// Checks the config to compare if count of commits is above threshold
func (t *Tree) CompareThreshold() bool {
if !t.config.AllowForceUpdate {
return false
}

log.Debug("Snapshot Commit counters", "counter", t.commitCounter, "threshold", t.config.CommitThreshold)
if t.commitCounter > t.config.CommitThreshold {
t.commitCounter = 0
return true
}

t.commitCounter++

return false
}