Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default selector with linear upkeep selection #13023

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
5 changes: 5 additions & 0 deletions .changeset/modern-ghosts-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Iterate over upkeeps using an upkeep selector
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package logprovider

import (
"crypto/sha256"
"encoding/hex"
"math"
"math/big"
"sort"
"strings"
"sync"
"sync/atomic"

Expand All @@ -26,7 +29,7 @@ type LogBuffer interface {
// It also accepts a function to select upkeeps.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand Down Expand Up @@ -75,6 +78,7 @@ type logBuffer struct {
// last block number seen by the buffer
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
//queueIDs []string
queues map[string]*upkeepLogQueue
lock sync.RWMutex
}
Expand All @@ -84,7 +88,8 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queues: make(map[string]*upkeepLogQueue),
//queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
}
}

Expand All @@ -110,11 +115,10 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

start, end := getBlockWindow(block, blockRate)
return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector)
}

Expand All @@ -126,11 +130,13 @@ func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int,
func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
var result []BufferedLog
var remainingLogs int
selectedUpkeeps := []string{}
numLogs := 0
for _, q := range b.queues {
if !upkeepSelector(q.id) {
// if the upkeep is not selected, skip it
continue
}
selectedUpkeeps = append(selectedUpkeeps, q.id.String())
logsInRange := q.sizeOfRange(start, end)
if logsInRange == 0 {
// if there are no logs in the range, skip the upkeep
Expand All @@ -150,11 +156,20 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS
result = append(result, BufferedLog{ID: q.id, Log: l})
capacity--
}
numLogs += len(logs)
remainingLogs += remaining
}
b.lggr.Debugw("dequeued logs for upkeeps", "numUpkeeps", len(selectedUpkeeps), "numLogs", numLogs, "hash", hashCombinedStrings(selectedUpkeeps))
return result, remainingLogs
}

func hashCombinedStrings(s []string) string {
combined := strings.Join(s, "")
hasher := sha256.New()
hashed := hasher.Sum([]byte(combined))
return hex.EncodeToString(hashed[:8])
}

func (b *logBuffer) SetConfig(lookback, blockRate, logLimit uint32) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down Expand Up @@ -197,6 +212,16 @@ func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) {
b.lock.Lock()
defer b.lock.Unlock()

//found := false
//for _, id := range b.queueIDs {
// if id == uid.String() {
// found = true
// break
// }
//}
//if !found {
// b.queueIDs = append(b.queueIDs, uid.String())
//}
b.queues[uid.String()] = buf
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package logprovider

import (
"fmt"
"math"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -141,13 +144,147 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) {
added, dropped := buf.Enqueue(id, logs...)
require.Equal(t, len(logs), added+dropped)
}
results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
start, end := getBlockWindow(tc.args.block, tc.args.blockRate)
results, remaining := buf.Dequeue(start, end, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
require.Equal(t, len(tc.results), len(results))
require.Equal(t, tc.remaining, remaining)
})
}
}

func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
t.Run("Dequeue from a deterministic order of upkeeps", func(t *testing.T) {
lookback := uint32(20)
blockRate := uint32(1)
logLimit := uint32(1)
buf := NewLogBuffer(logger.TestLogger(t), lookback, blockRate, logLimit)

upkeepIDs := []*big.Int{
big.NewInt(1),
big.NewInt(2),
big.NewInt(3),
big.NewInt(4),
big.NewInt(5),
}

numUpkeeps := len(upkeepIDs)

blockNumbers := []int64{
100, 101, 102, 103, 104, 105, 106, 107, 108, 109,
}

// for each upkeep, enqueue 10 logs per block, for 10 blocks
for _, upkeepID := range upkeepIDs {
for _, blockNumber := range blockNumbers {
for i := 0; i < 10; i++ {
log := logpoller.Log{
BlockNumber: blockNumber,
TxHash: common.HexToHash(fmt.Sprintf("0x%dff%dff%d", blockNumber, upkeepID.Int64(), i)),
}
buf.Enqueue(upkeepID, log)
}
}
}

bufV1 := buf.(*logBuffer)

assert.Equal(t, 5, len(bufV1.queues))

// each queue should have 100 logs
assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 100, len(bufV1.queues["5"].logs))

maxResults := 5
iterations := int(math.Ceil(float64(numUpkeeps*5) / float64(maxResults)))

assert.Equal(t, 5, iterations)

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(0) // on this dequeue attempt, current iteration will be 0
}

logs, remaining := buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(1) // on this dequeue attempt, current iteration will be 1
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(2) // on this dequeue attempt, current iteration will be 2
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(3) // on this dequeue attempt, current iteration will be 3
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(4) // on this dequeue attempt, current iteration will be 4
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 95, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
})

}

func TestLogEventBufferV1_Enqueue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash"
"io"
"math"
"math/big"
"runtime"
"sync"
Expand Down Expand Up @@ -114,19 +115,24 @@ type logEventProvider struct {
currentPartitionIdx uint64

chainID *big.Int

currentIteration int
calculateIterations bool
iterations int
}

func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
bufferV1: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit),
poller: poller,
opts: opts,
filterStore: filterStore,
chainID: chainID,
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
bufferV1: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit),
poller: poller,
opts: opts,
filterStore: filterStore,
chainID: chainID,
calculateIterations: true,
}
}

Expand Down Expand Up @@ -287,10 +293,26 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up

switch p.opts.BufferVersion {
case BufferVersionV1:
// in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range.
blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs()

if p.iterations == p.currentIteration {
p.calculateIterations = true
}

if p.calculateIterations {
p.calculateIterations = false
p.currentIteration = 0
p.iterations = int(math.Ceil(float64(p.bufferV1.NumOfUpkeeps()*logLimitLow) / float64(maxResults)))
}

//upkeepSelectorFn := func(id *big.Int) bool {
// return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
//}

for len(payloads) < maxResults && start <= latestBlock {
logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
startWindow, end := getBlockWindow(start, blockRate)

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}
Expand All @@ -300,13 +322,17 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
payloads = append(payloads, payload)
}
}

if remaining > 0 {
p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining)
// TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times
continue
}

start += int64(blockRate)

}
p.currentIteration++
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
for _, l := range logs {
Expand Down