Skip to content

Commit

Permalink
Cherry-picking performance fixes to the release branch (#744)
Browse files Browse the repository at this point in the history
* Adding evm_chain_id, address and event_sig to data and topic indexes
smartcontractkit/chainlink#12786
* Improved fetching Commit Reports from database
#726
  • Loading branch information
RensR committed Apr 23, 2024
2 parents 35f0bc5 + 8c82c7a commit 8624ec6
Show file tree
Hide file tree
Showing 18 changed files with 564 additions and 120 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-jokes-reply.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ccip": patch
---

Improved fetching CommitRoots that skips already executed ones from the database query
5 changes: 5 additions & 0 deletions .changeset/rich-jars-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Narrowing topic, data_word indexes by adding (evm_chain_id, address, event_sig) to the index definition #db_update
4 changes: 4 additions & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/bytedance/sonic v1.10.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down Expand Up @@ -209,6 +211,7 @@ require (
github.com/linxGnu/grocksdb v1.7.16 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
Expand Down Expand Up @@ -283,6 +286,7 @@ require (
github.com/ulule/limiter/v3 v3.11.2 // indirect
github.com/unrolled/secure v1.13.0 // indirect
github.com/valyala/fastjson v1.4.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
github.com/zondax/hid v0.9.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ github.com/aws/aws-sdk-go v1.45.25/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8P
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 h1:WWB576BN5zNSZc/M9d/10pqEx5VHNhaQ/yOVAkmj5Yo=
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -198,6 +200,7 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
Expand Down Expand Up @@ -833,6 +836,7 @@ github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwA
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand Down Expand Up @@ -907,6 +911,8 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/manifoldco/promptui v0.9.0 h1:3V4HzJk1TtXW1MTZMP7mdlwbBpIinw3HztaIlYthEiA=
github.com/manifoldco/promptui v0.9.0/go.mod h1:ka04sppxSGFAtxX0qhlYQjISsg9mR4GWtQEhdbn6Pgg=
github.com/manyminds/api2go v0.0.0-20171030193247-e7b693844a6f h1:tVvGiZQFjOXP+9YyGqSA6jE55x1XVxmoPYudncxrZ8U=
Expand Down Expand Up @@ -1312,6 +1318,8 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vertica/vertica-sql-go v1.3.3 h1:fL+FKEAEy5ONmsvya2WH5T8bhkvY27y/Ik3ReR2T+Qw=
github.com/vertica/vertica-sql-go v1.3.3/go.mod h1:jnn2GFuv+O2Jcjktb7zyc4Utlbu9YVqpHH/lx63+1M4=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
6 changes: 4 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
Expand Down Expand Up @@ -87,9 +88,10 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
return nil, types.ReportingPluginInfo{}, fmt.Errorf("get onchain config from offramp: %w", err)
}

lggr := rf.config.lggr.Named("ExecutionReportingPlugin")
return &ExecutionReportingPlugin{
F: config.F,
lggr: rf.config.lggr.Named("ExecutionReportingPlugin"),
lggr: lggr,
offchainConfig: offchainConfig,
tokenDataWorker: rf.config.tokenDataWorker,
gasPriceEstimator: gasPriceEstimator,
Expand All @@ -104,7 +106,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
offRampReader: rf.config.offRampReader,
tokenPoolBatchedReader: rf.config.tokenPoolBatchedReader,
inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()),
snoozedRoots: cache.NewSnoozedRoots(onchainConfig.PermissionLessExecutionThresholdSeconds, offchainConfig.RootSnoozeTime.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, onchainConfig.PermissionLessExecutionThresholdSeconds, offchainConfig.RootSnoozeTime.Duration()),
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
}, types.ReportingPluginInfo{
Expand Down
21 changes: 9 additions & 12 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type ExecutionReportingPlugin struct {

// State
inflightReports *inflightExecReportsContainer
snoozedRoots cache.SnoozedRoots
commitRootsCache cache.CommitsRootsCache
chainHealthcheck cache.ChainHealthcheck
}

Expand Down Expand Up @@ -139,12 +139,7 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
unexpiredReports, err := r.getUnexpiredCommitReports(
ctx,
r.commitStoreReader,
r.onchainConfig.PermissionLessExecutionThresholdSeconds,
lggr,
)
unexpiredReports, err := r.getUnexpiredCommitReports(ctx, r.commitStoreReader, lggr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,7 +195,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
// config.PermissionLessExecutionThresholdSeconds so it will never be considered again.
if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized {
rootLggr.Infow("Snoozing root forever since there are no executable txs anymore", "root", hex.EncodeToString(merkleRoot[:]))
r.snoozedRoots.MarkAsExecuted(merkleRoot)
r.commitRootsCache.MarkAsExecuted(merkleRoot)
continue
}

Expand Down Expand Up @@ -237,7 +232,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
if len(batch) != 0 {
return batch, nil
}
r.snoozedRoots.Snooze(merkleRoot)
r.commitRootsCache.Snooze(merkleRoot)
}
}
return []ccip.ObservedMessage{}, nil
Expand Down Expand Up @@ -983,12 +978,13 @@ func getTokensPrices(ctx context.Context, priceRegistry ccipdata.PriceRegistryRe
func (r *ExecutionReportingPlugin) getUnexpiredCommitReports(
ctx context.Context,
commitStoreReader ccipdata.CommitStoreReader,
permissionExecutionThreshold time.Duration,
lggr logger.Logger,
) ([]cciptypes.CommitStoreReport, error) {
createdAfterTimestamp := r.commitRootsCache.OldestRootTimestamp()
lggr.Infow("Fetching unexpired commit roots from database", "createdAfterTimestamp", createdAfterTimestamp)
acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteTimestamp(
ctx,
time.Now().Add(-permissionExecutionThreshold),
createdAfterTimestamp,
0,
)
if err != nil {
Expand All @@ -998,11 +994,12 @@ func (r *ExecutionReportingPlugin) getUnexpiredCommitReports(
var reports []cciptypes.CommitStoreReport
for _, acceptedReport := range acceptedReports {
reports = append(reports, acceptedReport.CommitStoreReport)
r.commitRootsCache.AppendUnexecutedRoot(acceptedReport.MerkleRoot, time.UnixMilli(acceptedReport.TxMeta.BlockTimestampUnixMilli))
}

notSnoozedReports := make([]cciptypes.CommitStoreReport, 0)
for _, report := range reports {
if r.snoozedRoots.IsSnoozed(report.MerkleRoot) {
if r.commitRootsCache.IsSkipped(report.MerkleRoot) {
lggr.Debugw("Skipping snoozed root",
"minSeqNr", report.Interval.Min,
"maxSeqNr", report.Interval.Max,
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
mockOnRampPriceRegistryProvider.On("NewPriceRegistryReader", ctx, sourcePriceRegistryAddress).Return(sourcePriceRegReader, nil).Maybe()
p.sourcePriceRegistryProvider = mockOnRampPriceRegistryProvider

p.snoozedRoots = cache.NewSnoozedRoots(time.Minute, time.Minute)
p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), time.Minute, time.Minute)
p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader)

_, err = p.Observation(ctx, types.ReportTimestamp{}, types.Query{})
Expand Down
194 changes: 194 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package cache

import (
"encoding/hex"
"sync"
"time"

"github.com/patrickmn/go-cache"
orderedmap "github.com/wk8/go-ordered-map/v2"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const (
// EvictionGracePeriod defines how long after the permissionless execution threshold a root is still kept in the cache
EvictionGracePeriod = 1 * time.Hour
// CleanupInterval defines how often roots cache is scanned to evict stale roots
CleanupInterval = 30 * time.Minute
)

type CommitsRootsCache interface {
// IsSkipped returns true if the root is either executed or snoozed. Snoozing can be temporary based on the configuration
IsSkipped(merkleRoot [32]byte) bool
MarkAsExecuted(merkleRoot [32]byte)
Snooze(merkleRoot [32]byte)

// OldestRootTimestamp returns the oldest root timestamp that is not executed yet (minus 1 second).
// If there are no roots in the queue, it returns the permissionlessExecThreshold
OldestRootTimestamp() time.Time
// AppendUnexecutedRoot appends the root to the unexecuted roots queue to keep track of the roots that are not executed yet
// Roots has to be added in the order they are fetched from the database
AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time)
}

type commitRootsCache struct {
lggr logger.Logger
// executedRoots is used to keep track of the roots that are executed. Roots that are considered as executed
// when all messages are executed on the dest and matching execution state change logs are finalized
executedRoots *cache.Cache
// snoozedRoots is used to keep track of the roots that are temporary snoozed
snoozedRoots *cache.Cache
// unexecutedRootsQueue is used to keep track of the unexecuted roots in the order they are fetched from database (should be ordered by block_number, log_index)
// First run of Exec will fill the queue with all the roots that are not executed yet within the [now-permissionlessExecThreshold, now] window.
// When a root is executed, it is removed from the queue. Next database query instead of using entire permissionlessExecThrehsold window
// will use oldestRootTimestamp as the lower bound filter for block_timestamp.
// This way we can reduce the number of database rows fetched with every OCR round.
// We do it this way because roots for most of the cases are executed sequentially.
// Instead of skipping snoozed roots after we fetch them from the database, we do that on the db level by narrowing the search window.
//
// Example
// permissionLessExecThresholds - 10 days, now - 2010-10-15
// We fetch all the roots that within the [2010-10-05, 2010-10-15] window and load them to the queue
// [0xA - 2010-10-10, 0xB - 2010-10-11, 0xC - 2010-10-12] -> 0xA is the oldest root
// We executed 0xA and a couple of rounds later, we mark 0xA as executed and snoozed that forever which removes it from the queue.
// [0xB - 2010-10-11, 0xC - 2010-10-12]
// Now the search filter wil be 0xA timestamp -> [2010-10-11, 20-10-15]
// If roots are executed out of order, it's not going to change anything. However, for most of the cases we have sequential root execution and that is
// a huge improvement because we don't need to fetch all the roots from the database in every round.
unexecutedRootsQueue *orderedmap.OrderedMap[string, time.Time]
oldestRootTimestamp time.Time
rootsQueueMu sync.RWMutex

// Both rootSnoozedTime and permissionLessExecutionThresholdDuration can be kept in the commitRootsCache without need to be updated.
// Those config properties are populates via onchain/offchain config. When changed, OCR plugin will be restarted and cache initialized with new config.
rootSnoozedTime time.Duration
permissionLessExecutionThresholdDuration time.Duration
}

func newCommitRootsCache(
lggr logger.Logger,
permissionLessExecutionThresholdDuration time.Duration,
rootSnoozeTime time.Duration,
evictionGracePeriod time.Duration,
cleanupInterval time.Duration,
) *commitRootsCache {
executedRoots := cache.New(permissionLessExecutionThresholdDuration+evictionGracePeriod, cleanupInterval)
snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval)

return &commitRootsCache{
lggr: lggr,
executedRoots: executedRoots,
snoozedRoots: snoozedRoots,
unexecutedRootsQueue: orderedmap.New[string, time.Time](),
rootSnoozedTime: rootSnoozeTime,
permissionLessExecutionThresholdDuration: permissionLessExecutionThresholdDuration,
}
}

func NewCommitRootsCache(
lggr logger.Logger,
permissionLessExecutionThresholdDuration time.Duration,
rootSnoozeTime time.Duration,
) *commitRootsCache {
return newCommitRootsCache(
lggr,
permissionLessExecutionThresholdDuration,
rootSnoozeTime,
EvictionGracePeriod,
CleanupInterval,
)
}

func (s *commitRootsCache) IsSkipped(merkleRoot [32]byte) bool {
_, snoozed := s.snoozedRoots.Get(merkleRootToString(merkleRoot))
_, executed := s.executedRoots.Get(merkleRootToString(merkleRoot))
return snoozed || executed
}

func (s *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) {
prettyMerkleRoot := merkleRootToString(merkleRoot)
s.executedRoots.SetDefault(prettyMerkleRoot, struct{}{})

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()
// if there is only one root in the queue, we put its block_timestamp as oldestRootTimestamp
if s.unexecutedRootsQueue.Len() == 1 {
s.oldestRootTimestamp = s.unexecutedRootsQueue.Oldest().Value
}
s.unexecutedRootsQueue.Delete(prettyMerkleRoot)
if head := s.unexecutedRootsQueue.Oldest(); head != nil {
s.oldestRootTimestamp = head.Value
}
s.lggr.Debugw("Deleting executed root from the queue",
"merkleRoot", prettyMerkleRoot,
"oldestRootTimestamp", s.oldestRootTimestamp,
)
}

func (s *commitRootsCache) Snooze(merkleRoot [32]byte) {
s.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{})
}

func (s *commitRootsCache) OldestRootTimestamp() time.Time {
permissionlessExecWindow := time.Now().Add(-s.permissionLessExecutionThresholdDuration)
timestamp, ok := s.pickOldestRootBlockTimestamp(permissionlessExecWindow)

if ok {
return timestamp
}

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()

// If rootsSearchFilter is before permissionlessExecWindow, it means that we have roots that are stuck forever and will never be executed
// In that case, we wipe out the entire queue. Next round should start from the permissionlessExecThreshold and rebuild cache from scratch.
s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
return permissionlessExecWindow
}

func (s *commitRootsCache) pickOldestRootBlockTimestamp(permissionlessExecWindow time.Time) (time.Time, bool) {
s.rootsQueueMu.RLock()
defer s.rootsQueueMu.RUnlock()

// If there are no roots in the queue, we can return the permissionlessExecWindow
if s.oldestRootTimestamp.IsZero() {
return permissionlessExecWindow, true
}

if s.oldestRootTimestamp.After(permissionlessExecWindow) {
// Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
return s.oldestRootTimestamp.Add(-time.Second), true
}
return time.Time{}, false
}
func (s *commitRootsCache) AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) {
prettyMerkleRoot := merkleRootToString(merkleRoot)

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()

// If the root is already in the queue, we must not add it to the queue
if _, found := s.unexecutedRootsQueue.Get(prettyMerkleRoot); found {
return
}
// If the root is already executed, we must not add it to the queue
if _, executed := s.executedRoots.Get(prettyMerkleRoot); executed {
return
}
// Initialize the search filter with the first root that is added to the queue
if s.unexecutedRootsQueue.Len() == 0 {
s.oldestRootTimestamp = blockTimestamp
}
s.unexecutedRootsQueue.Set(prettyMerkleRoot, blockTimestamp)
s.lggr.Debugw("Adding unexecuted root to the queue",
"merkleRoot", prettyMerkleRoot,
"blockTimestamp", blockTimestamp,
"oldestRootTimestamp", s.oldestRootTimestamp,
)
}

func merkleRootToString(merkleRoot [32]byte) string {
return hex.EncodeToString(merkleRoot[:])
}

0 comments on commit 8624ec6

Please sign in to comment.