Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically calculate performables limit based on perform data size #305

6 changes: 3 additions & 3 deletions pkg/v3/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (observation AutomationObservation) Encode() ([]byte, error) {
}

func DecodeAutomationObservation(data []byte, utg ocr2keepers.UpkeepTypeGetter, wg ocr2keepers.WorkIDGenerator) (AutomationObservation, error) {
if len(data) > MaxObservationLength {
return AutomationObservation{}, fmt.Errorf("observation size cannot be greater than %d; has %d bytes", MaxObservationLength, len(data))
}
ao := AutomationObservation{}
err := json.Unmarshal(data, &ao)
if err != nil {
Expand Down Expand Up @@ -73,9 +76,6 @@ func validateAutomationObservation(o AutomationObservation, utg ocr2keepers.Upke
}

// Validate Performables
if (len(o.Performable)) > ObservationPerformablesLimit {
return fmt.Errorf("performable length cannot be greater than %d", ObservationPerformablesLimit)
}
seenPerformables := make(map[string]bool)
for _, res := range o.Performable {
if err := validateCheckResult(res, utg, wg); err != nil {
Expand Down
22 changes: 15 additions & 7 deletions pkg/v3/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,34 @@ func TestDuplicateBlockHistory(t *testing.T) {
assert.ErrorContains(t, err, "block history cannot have duplicate block numbers")
}

func TestLargePerformable(t *testing.T) {
func TestLargeObservation(t *testing.T) {
ao := AutomationObservation{
Performable: []types.CheckResult{},
UpkeepProposals: []types.CoordinatedBlockProposal{validConditionalProposal, validLogProposal},
BlockHistory: validBlockHistory,
}
for i := 0; i < ObservationPerformablesLimit+1; i++ {
newConditionalResult := validConditionalResult
size := ao.BlockHistory.Size()
for _, prop := range ao.UpkeepProposals {
size += prop.Size()
}
for i := 0; size < MaxObservationLength; i++ {
uid := types.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 1)))
newConditionalResult.UpkeepID = uid
newConditionalResult.WorkID = mockWorkIDGenerator(newConditionalResult.UpkeepID, newConditionalResult.Trigger)
ao.Performable = append(ao.Performable, newConditionalResult)
newResult := validLogResult
if mockUpkeepTypeGetter(uid) == types.ConditionTrigger {
newResult = validConditionalResult
}
newResult.UpkeepID = uid
newResult.WorkID = mockWorkIDGenerator(newResult.UpkeepID, newResult.Trigger)
ao.Performable = append(ao.Performable, newResult)
size += newResult.Size()
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation observation")

_, err = DecodeAutomationObservation(encoded, mockUpkeepTypeGetter, mockWorkIDGenerator)
assert.Error(t, err)
assert.ErrorContains(t, err, "performable length cannot be greater than")
assert.ErrorContains(t, err, "observation size cannot be greater than")
}

func TestDuplicatePerformable(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions pkg/v3/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const (
// a round. NOTE: This is derived from a limit of 10000 on performData
// which is guaranteed onchain
MaxOutcomeLength = 2_500_000
// maxOutcomeProposalsLength is the maximum length of bytes for all proposals in an outcome
MaxOutcomeProposalsLength = OutcomeSurfacedProposalsLimit * OutcomeSurfacedProposalsRoundHistoryLimit *
(32 + 32 + (32 + 8 + 32 + 4 + 32 + 8)) // upkeepID + workID + trigger (blockNumber + blockHash + logTriggerExtension)
// MaxReportLength limits the total length of bytes for a single report.
MaxReportLength = 1_000_000
// MaxReportCount limits the total number of reports allowed to be produced
Expand Down Expand Up @@ -53,9 +56,6 @@ type AutomationOutcome struct {
// ValidateAutomationOutcome validates individual values in an AutomationOutcome
func validateAutomationOutcome(o AutomationOutcome, utg ocr2keepers.UpkeepTypeGetter, wg ocr2keepers.WorkIDGenerator) error {
// Validate AgreedPerformables
if (len(o.AgreedPerformables)) > OutcomeAgreedPerformablesLimit {
return fmt.Errorf("outcome performable length cannot be greater than %d", OutcomeAgreedPerformablesLimit)
}
seenPerformables := make(map[string]bool)
for _, res := range o.AgreedPerformables {
if err := validateCheckResult(res, utg, wg); err != nil {
Expand Down Expand Up @@ -87,6 +87,7 @@ func validateAutomationOutcome(o AutomationOutcome, utg ocr2keepers.UpkeepTypeGe
seenProposals[proposal.WorkID] = true
}
}

return nil
}

Expand All @@ -99,6 +100,9 @@ func (outcome AutomationOutcome) Encode() ([]byte, error) {
// DecodeAutomationOutcome decodes an AutomationOutcome from an encoded array
// of bytes. Possible errors come from the encoding/json package
func DecodeAutomationOutcome(data []byte, utg ocr2keepers.UpkeepTypeGetter, wg ocr2keepers.WorkIDGenerator) (AutomationOutcome, error) {
if len(data) > MaxOutcomeLength {
return AutomationOutcome{}, fmt.Errorf("outcome size cannot be greater than %d bytes; has %d bytes", MaxOutcomeLength, len(data))
}
ao := AutomationOutcome{}
err := json.Unmarshal(data, &ao)
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions pkg/v3/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,28 @@ func TestLargeAgreedPerformables(t *testing.T) {
AgreedPerformables: []types.CheckResult{},
SurfacedProposals: [][]types.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
}
for i := 0; i < OutcomeAgreedPerformablesLimit+1; i++ {
newConditionalResult := validConditionalResult
newConditionalResult.Trigger.BlockNumber = types.BlockNumber(i + 1)
newConditionalResult.WorkID = mockWorkIDGenerator(newConditionalResult.UpkeepID, newConditionalResult.Trigger)
size := 0
for _, r := range ao.SurfacedProposals {
for _, p := range r {
size += p.Size()
}
}
for i := 0; size < MaxOutcomeLength; i++ {
newResult := validLogResult
uid := types.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 10001)))
newResult.UpkeepID = uid
newResult.Trigger.BlockNumber = types.BlockNumber(i + 1)
newResult.WorkID = mockWorkIDGenerator(newResult.UpkeepID, newResult.Trigger)
ao.AgreedPerformables = append(ao.AgreedPerformables, validConditionalResult)
size += newResult.Size()
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation outcome")

_, err = DecodeAutomationOutcome(encoded, mockUpkeepTypeGetter, mockWorkIDGenerator)
assert.Error(t, err)
assert.ErrorContains(t, err, "outcome performable length cannot be greater than")
assert.ErrorContains(t, err, "outcome size cannot be greater than")
}

func TestDuplicateAgreedPerformables(t *testing.T) {
Expand Down
29 changes: 25 additions & 4 deletions pkg/v3/plugin/hooks/add_from_staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type AddFromStagingHook struct {
// that is the same across all nodes for a given round. This ensures that all nodes try to
// send the same subset of workIDs if they are available, while giving different priority
// to workIDs in different rounds.
func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation, limit int, rSrc [16]byte) error {
func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation, sizeLimit, limit int, rSrc [16]byte) error {
results, err := hook.store.View()
if err != nil {
return err
Expand All @@ -49,11 +49,32 @@ func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation
sort.Slice(results, func(i, j int) bool {
return shuffledIDs[results[i].WorkID] < shuffledIDs[results[j].WorkID]
})
if len(results) > limit {

observationSize := obs.BlockHistory.Size()
for _, proposal := range obs.UpkeepProposals {
observationSize += proposal.Size()
}
for _, result := range obs.Performable {
observationSize += result.Size()
}
// TODO: remove this in next version, it's a temporary fix for
// supporting old nodes that will limit the number of results rather than the size
// of the observation
if limit > 0 && len(results) > limit {
results = results[:limit]
}
hook.logger.Printf("adding %d results to observation", len(results))
obs.Performable = append(obs.Performable, results...)
// add results to observation until size limit is reached
added := 0
for _, result := range results {
observationSize += result.Size()
if observationSize > sizeLimit {
break
}
obs.Performable = append(obs.Performable, result)
added++
}

hook.logger.Printf("adding %d results to observation", added)

return nil
}
55 changes: 33 additions & 22 deletions pkg/v3/plugin/hooks/add_from_staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
coordinatorFilterResults []types.CheckResult
coordinatorErr error
rSrc [16]byte
sizeLimit int
limit int
observationWorkIDs []string
expectedErr error
Expand All @@ -36,7 +37,8 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
limit: 10,
sizeLimit: 1_000_000,
limit: 10,
coordinatorFilterResults: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
Expand All @@ -50,6 +52,7 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
initialObservation: ocr2keepersv3.AutomationObservation{},
resultStoreResults: []types.CheckResult{},
coordinatorFilterResults: []types.CheckResult{},
sizeLimit: 1_000_000,
limit: 10,
observationWorkIDs: []string{},
expectedLogMsg: "adding 0 results to observation",
Expand All @@ -62,7 +65,8 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
limit: 10,
sizeLimit: 1_000_000,
limit: 10,
coordinatorFilterResults: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
Expand All @@ -79,7 +83,8 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
},
limit: 10,
sizeLimit: 1_000_000,
limit: 10,
coordinatorFilterResults: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
Expand All @@ -95,7 +100,8 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
limit: 2,
sizeLimit: 1_000_000,
limit: 2,
coordinatorFilterResults: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
Expand All @@ -112,7 +118,8 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
limit: 1,
sizeLimit: 1_000_000,
limit: 1,
coordinatorFilterResults: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
Expand All @@ -129,8 +136,9 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
rSrc: [16]byte{1},
limit: 2,
rSrc: [16]byte{1},
sizeLimit: 1_000_000,
limit: 2,
coordinatorFilterResults: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
Expand Down Expand Up @@ -162,7 +170,7 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {
addFromStagingHook := NewAddFromStagingHook(mockResultStore, mockCoordinator, logger)

// Run the hook
err := addFromStagingHook.RunHook(obs, tt.limit, tt.rSrc)
err := addFromStagingHook.RunHook(obs, tt.sizeLimit, tt.limit, tt.rSrc)

if tt.expectedErr != nil {
// Assert that the hook function returns the expected error
Expand All @@ -188,22 +196,25 @@ func TestAddFromStagingHook_RunHook(t *testing.T) {

func TestAddFromStagingHook_RunHook_Limits(t *testing.T) {
tests := []struct {
name string
n int
limit int
expected int
name string
n int
sizeLimit int
limit int
expected int
}{
{
name: "limit is less than results",
n: 1000,
limit: 100,
expected: 100,
name: "limit is less than results",
n: 1000,
sizeLimit: 1_000_000,
limit: 100,
expected: 100,
},
{
name: "limit is greater than results",
n: 100,
limit: 200,
expected: 100,
name: "limit is greater than results",
n: 100,
sizeLimit: 1_000_000,
limit: 200,
expected: 100,
},
}

Expand All @@ -217,7 +228,7 @@ func TestAddFromStagingHook_RunHook_Limits(t *testing.T) {
rSrc := [16]byte{1, 1, 2, 2, 3, 3, 4, 4}
obs := &ocr2keepersv3.AutomationObservation{}

err := addFromStagingHook.RunHook(obs, tt.limit, rSrc)
err := addFromStagingHook.RunHook(obs, tt.sizeLimit, tt.limit, rSrc)
assert.NoError(t, err)
assert.Len(t, obs.Performable, tt.expected)

Expand All @@ -227,7 +238,7 @@ func TestAddFromStagingHook_RunHook_Limits(t *testing.T) {
addFromStagingHook2 := NewAddFromStagingHook(mockResultStore2, mockCoordinator2, logger)

obs2 := &ocr2keepersv3.AutomationObservation{}
err2 := addFromStagingHook2.RunHook(obs2, tt.limit, rSrc)
err2 := addFromStagingHook2.RunHook(obs2, tt.sizeLimit, tt.limit, rSrc)
assert.NoError(t, err2)
assert.Len(t, obs.Performable, tt.expected)
assert.Equal(t, obs.Performable, obs2.Performable)
Expand Down
9 changes: 5 additions & 4 deletions pkg/v3/plugin/ocr3.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func (plugin *ocr3Plugin) Observation(ctx context.Context, outctx ocr3types.Outc

plugin.AddBlockHistoryHook.RunHook(&observation, ocr2keepersv3.ObservationBlockHistoryLimit)

if err := plugin.AddFromStagingHook.RunHook(&observation, ocr2keepersv3.ObservationPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr)); err != nil {
return nil, err
}
if err := plugin.AddLogProposalsHook.RunHook(&observation, ocr2keepersv3.ObservationLogRecoveryProposalsLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr)); err != nil {
return nil, err
}
if err := plugin.AddConditionalProposalsHook.RunHook(&observation, ocr2keepersv3.ObservationConditionalsProposalsLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr)); err != nil {
return nil, err
}
if err := plugin.AddFromStagingHook.RunHook(&observation, ocr2keepersv3.MaxObservationLength, ocr2keepersv3.ObservationPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr)); err != nil {
return nil, err
}

plugin.Logger.Printf("built an observation in sequence nr %d with %d performables, %d upkeep proposals and %d block history", outctx.SeqNr, len(observation.Performable), len(observation.UpkeepProposals), len(observation.BlockHistory))

Expand All @@ -90,7 +90,8 @@ func (plugin *ocr3Plugin) ValidateObservation(outctx ocr3types.OutcomeContext, q

func (plugin *ocr3Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, attributedObservations []types.AttributedObservation) (ocr3types.Outcome, error) {
plugin.Logger.Printf("inside Outcome for seqNr %d", outctx.SeqNr)
p := newPerformables(plugin.F+1, ocr2keepersv3.OutcomeAgreedPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr), plugin.Logger)
maxPerformablesSize := ocr2keepersv3.MaxOutcomeLength - ocr2keepersv3.MaxOutcomeProposalsLength
p := newPerformables(plugin.F+1, maxPerformablesSize, ocr2keepersv3.OutcomeAgreedPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr), plugin.Logger)
c := newCoordinatedBlockProposals(plugin.F+1, ocr2keepersv3.OutcomeSurfacedProposalsRoundHistoryLimit, ocr2keepersv3.OutcomeSurfacedProposalsLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr), plugin.Logger)

for _, attributedObservation := range attributedObservations {
Expand Down
14 changes: 8 additions & 6 deletions pkg/v3/plugin/ocr3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,14 @@ func TestOcr3Plugin_Observation(t *testing.T) {
hash := crypto.Keccak256(append(uid[:], triggerExtBytes...))
return hex.EncodeToString(hash[:])
},
RemoveFromStagingHook: hooks.NewRemoveFromStagingHook(resultStore, logger),
RemoveFromMetadataHook: hooks.NewRemoveFromMetadataHook(metadataStore, logger),
AddToProposalQHook: hooks.NewAddToProposalQHook(proposalQueue, logger),
AddBlockHistoryHook: hooks.NewAddBlockHistoryHook(metadataStore, logger),
AddFromStagingHook: hooks.NewAddFromStagingHook(resultStore, coordinator, logger),
Logger: logger,
AddLogProposalsHook: hooks.NewAddLogProposalsHook(metadataStore, coordinator, logger),
AddConditionalProposalsHook: hooks.NewAddConditionalProposalsHook(metadataStore, coordinator, logger),
RemoveFromStagingHook: hooks.NewRemoveFromStagingHook(resultStore, logger),
RemoveFromMetadataHook: hooks.NewRemoveFromMetadataHook(metadataStore, logger),
AddToProposalQHook: hooks.NewAddToProposalQHook(proposalQueue, logger),
AddBlockHistoryHook: hooks.NewAddBlockHistoryHook(metadataStore, logger),
AddFromStagingHook: hooks.NewAddFromStagingHook(resultStore, coordinator, logger),
Logger: logger,
}

previousOutcome := ocr2keepers2.AutomationOutcome{
Expand Down