Skip to content

Commit

Permalink
core/state: clean up waiting for pre-fetches a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Apr 16, 2024
1 parent deb548f commit 139448f
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 95 deletions.
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,8 +1827,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
statedb.SetLogger(bc.logger)

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
}
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
Expand Down
8 changes: 7 additions & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/holiman/uint256"
Expand Down Expand Up @@ -143,7 +144,12 @@ func (s *stateObject) getTrie() (Trie, error) {
// Try fetching from prefetcher first
if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no prefetcher
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
trie, err := s.db.prefetcher.trie(s.addrHash, s.data.Root)
if err != nil {
log.Error("Failed to retrieve storage pre-fetcher trie", "addr", s.address, "err", err)
} else {
s.trie = trie
}
}
if s.trie == nil {
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
Expand Down
31 changes: 15 additions & 16 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher.terminate()
s.prefetcher.report()
s.prefetcher = nil
}
if s.snap != nil {
Expand All @@ -192,7 +193,8 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// from the gathered metrics.
func (s *StateDB) StopPrefetcher() {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher.terminate()
s.prefetcher.report()
s.prefetcher = nil
}
}
Expand Down Expand Up @@ -873,18 +875,13 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)

// If there was a trie prefetcher operating, it gets aborted and irrevocably
// modified after we start retrieving tries. Remove it from the statedb after
// this round of use.
//
// This is weird pre-byzantium since the first tx runs with a prefetcher and
// the remainder without, but pre-byzantium even the initial prefetcher is
// useless, so no sleep lost.
prefetcher := s.prefetcher
// If there was a trie prefetcher operating, terminate it (blocking until
// all tasks finish) and then proceed with the trie hashing.
if s.prefetcher != nil {
s.prefetcher.terminate()
defer func() {
s.prefetcher.close()
s.prefetcher = nil
s.prefetcher.report()
s.prefetcher = nil // Pre-byzantium, unset any used up prefetcher
}()
}
// Although naively it makes sense to retrieve the account trie and then do
Expand All @@ -900,8 +897,10 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
if prefetcher != nil {
if trie := prefetcher.trie(common.Hash{}, s.originalRoot); trie != nil {
if s.prefetcher != nil {
if trie, err := s.prefetcher.trie(common.Hash{}, s.originalRoot); err != nil {
log.Error("Failed to retrieve account pre-fetcher trie", "err", err)
} else if trie != nil {
s.trie = trie
}
}
Expand Down Expand Up @@ -930,8 +929,8 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.deleteStateObject(deletedAddr)
s.AccountDeleted += 1
}
if prefetcher != nil {
prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
if s.prefetcher != nil {
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
}
if len(s.stateObjectsPending) > 0 {
s.stateObjectsPending = make(map[common.Address]struct{})
Expand Down
147 changes: 80 additions & 67 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ var (
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"

// errTerminated is returned if any invocation is applied on a terminated fetcher.
// errTerminated is returned if a fetcher is attempted to be operated after it
// has already terminated.
errTerminated = errors.New("fetcher is already terminated")

// errNotTerminated is returned if a fetchers data is attempted to be retrieved
// before it terminates.
errNotTerminated = errors.New("fetcher is not yet terminated")
)

// triePrefetcher is an active prefetcher, which receives accounts or storage
Expand All @@ -42,7 +47,7 @@ type triePrefetcher struct {
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of the account trie for metrics
fetchers map[string]*subfetcher // Subfetchers for each trie
closed bool
term chan struct{} // Channel to signal interruption

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
Expand All @@ -59,6 +64,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
term: make(chan struct{}),

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
Expand All @@ -70,36 +76,42 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
}
}

// close iterates over all the subfetchers, waits on any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
// terminate iterates over all the subfetchers, waiting on any that still spin.
func (p *triePrefetcher) terminate() {
// Short circuit if the fetcher is already closed.
if p.closed {
select {
case <-p.term:
return
default:
}
for _, fetcher := range p.fetchers {
fetcher.close()
fetcher.terminate()
}
close(p.term)
}

if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
// report aggregates the pre-fetching and usage metrics and reports them.
func (p *triePrefetcher) report() {
if !metrics.Enabled {
return
}
for _, fetcher := range p.fetchers {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
p.closed = true
p.fetchers = nil
}

// prefetch schedules a batch of trie items to prefetch. After the prefetcher is
Expand All @@ -114,8 +126,11 @@ func (p *triePrefetcher) close() {
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
if p.closed {
// Ensure the subfetcher is still alive
select {
case <-p.term:
return errTerminated
default:
}
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
Expand All @@ -128,25 +143,19 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm

// trie returns the trie matching the root hash, or nil if either the fetcher
// is terminated or the trie is not available.
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
if p.closed {
return nil
}
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) (Trie, error) {
// Bail if no trie was prefetched for this root
fetcher := p.fetchers[p.trieID(owner, root)]
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
return nil, nil
}
return fetcher.peek()
}

// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the fetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
if p.closed {
return
}
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.used = used
}
Expand Down Expand Up @@ -175,10 +184,9 @@ type subfetcher struct {
tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue

wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption
copy chan chan Trie // channel for retrieving copies of the subfetcher's trie
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption

seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
Expand All @@ -197,7 +205,6 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
}
go sf.loop()
Expand All @@ -206,46 +213,45 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
return errTerminated
default:
}
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()

// Notify the background thread to execute scheduled tasks
select {
case <-sf.term:
// Reject scheduling a task if the fetcher's already terminated
return errTerminated
case sf.wake <- struct{}{}:
// Wake signal sent
default:
// Otherwise notify the fetcher if it's not already done so (async!)
//
// Note, the default branch is to give priority to term check as the
// select evaluates branches randomly.
select {
case sf.wake <- struct{}{}:
// Wake signal sent
default:
// Wake signal not sent as a previous is already queued
}
return nil
// Wake signal not sent as a previous is already queued
}
return nil
}

// peek tries to retrieve a deep copy of the fetcher's trie. Nil is returned
// if the fetcher is already terminated, or the associated trie is failing
// for opening.
func (sf *subfetcher) peek() Trie {
ch := make(chan Trie)
// peek retrieves the fetcher's trie, populated with any pre-fetched data. The
// returned trie will be a shallow copy, so modifying it will break subsequent
// peeks for the original data.
//
// This method can only be called after closing the subfetcher.
func (sf *subfetcher) peek() (Trie, error) {
// Ensure the subfetcher finished operating on its trie
select {
case sf.copy <- ch:
return <-ch
case <-sf.term:
return nil
default:
return nil, errNotTerminated
}
return sf.trie, nil
}

// close waits for the subfetcher to finish its tasks. It cannot be called multiple times
func (sf *subfetcher) close() {
// terminate waits for the subfetcher to finish its tasks, after which it tears
// down all the internal background loaders.
func (sf *subfetcher) terminate() {
select {
case <-sf.stop:
default:
Expand Down Expand Up @@ -298,13 +304,20 @@ func (sf *subfetcher) loop() {
}
sf.seen[string(task)] = struct{}{}
}
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)

case <-sf.stop:
// Termination is requested, abort
return
// Termination is requested, abort if no more tasks are pending. If
// there are some, exhaust them first.
sf.lock.Lock()
done := sf.tasks == nil
sf.lock.Unlock()

if done {
return
}
// Some tasks are pending, loop and pick them up (that wake branch
// will be selected eventually, whilst stop remains closed to this
// branch will also run afterwards).
}
}
}
23 changes: 14 additions & 9 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,23 @@ func filledStateDB() *StateDB {
return state
}

func TestUseAfterClose(t *testing.T) {
func TestUseAfterTerminate(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
a := prefetcher.trie(common.Hash{}, db.originalRoot)
prefetcher.close()
b := prefetcher.trie(common.Hash{}, db.originalRoot)
if a == nil {
t.Fatal("Prefetching before close should not return nil")

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil {
t.Errorf("Prefetch failed before terminate: %v", err)
}
if _, err := prefetcher.trie(common.Hash{}, db.originalRoot); err == nil {
t.Errorf("Trie retrieval succeeded before terminate")
}
prefetcher.terminate()

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil {
t.Errorf("Prefetch succeeded after terminate: %v", err)
}
if b != nil {
t.Fatal("Trie after close should return nil")
if _, err := prefetcher.trie(common.Hash{}, db.originalRoot); err != nil {
t.Errorf("Trie retrieval failed after terminate: %v", err)
}
}

0 comments on commit 139448f

Please sign in to comment.