Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update #319

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/v3/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
defaultCacheClean = time.Duration(30) * time.Second
)

// prevent repeating transmission for the same upkeep (check in-flight upkeeps) by checking onchain events from event provider
// transmit again bc check block changes etc.
type coordinator struct {
closer internalutil.Closer
logger *log.Logger
Expand Down
16 changes: 8 additions & 8 deletions pkg/v3/flows/conditional.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
)

const (
// This is the ticker interval for sampling conditional flow
// SamplingConditionInterval is the ticker interval for sampling conditional flow
SamplingConditionInterval = 3 * time.Second
// Maximum number of upkeeps to be sampled in every round
// MaxSampledConditionals is the maximum number of upkeeps to be sampled in every round
MaxSampledConditionals = 300
// This is the ticker interval for final conditional flow
// FinalConditionalInterval is the ticker interval for final conditional flow
FinalConditionalInterval = 1 * time.Second
// These are the maximum number of conditional upkeeps dequeued on every tick from proposal queue in FinalConditionalFlow
// This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round
// FinalConditionalBatchSize is the maximum number of conditional upkeeps dequeued on every tick from proposal queue
// in FinalConditionalFlow. This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by
// plugin in every round
FinalConditionalBatchSize = 50
)

Expand All @@ -40,11 +41,11 @@ func newSampleProposalFlow(
logger *log.Logger,
) service.Recoverable {
pre = append(pre, preprocessors.NewProposalFilterer(ms, types.LogTrigger))
postprocessors := postprocessors.NewAddProposalToMetadataStorePostprocessor(ms)
post := postprocessors.NewAddProposalToMetadataStorePostprocessor(ms)

observer := ocr2keepersv3.NewRunnableObserver(
pre,
postprocessors,
post,
runner,
ObservationProcessLimit,
log.New(logger.Writer(), fmt.Sprintf("[%s | sample-proposal-observer]", telemetry.ServiceName), telemetry.LogPkgStdFlags),
Expand Down Expand Up @@ -114,7 +115,6 @@ func newFinalConditionalFlow(
proposalQ types.ProposalQueue,
builder common.PayloadBuilder,
retryQ types.RetryQueue,
stateUpdater common.UpkeepStateUpdater,
logger *log.Logger,
) service.Recoverable {
post := postprocessors.NewCombinedPostprocessor(
Expand Down
3 changes: 1 addition & 2 deletions pkg/v3/flows/conditional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestConditionalFinalization(t *testing.T) {
// set the ticker time lower to reduce the test time
interval := 50 * time.Millisecond
pre := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}
svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, upkeepStateUpdater, logger)
svc := newFinalConditionalFlow(pre, rStore, runner, interval, proposalQ, payloadBuilder, retryQ, logger)

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -206,5 +206,4 @@ func TestSamplingProposal(t *testing.T) {
upkeepProvider.AssertExpectations(t)
coord.AssertExpectations(t)
runner.AssertExpectations(t)
// ratio.AssertExpectations(t)
}
7 changes: 3 additions & 4 deletions pkg/v3/flows/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,29 @@ import (
"log"
"time"

common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

func ConditionalTriggerFlows(
coord ocr2keepersv3.PreProcessor[common.UpkeepPayload],
ratio types.Ratio,
getter common.ConditionalUpkeepProvider,
subscriber common.BlockSubscriber,
builder common.PayloadBuilder,
resultStore types.ResultStore,
metadataStore types.MetadataStore,
runner ocr2keepersv3.Runner,
proposalQ types.ProposalQueue,
retryQ types.RetryQueue,
stateUpdater common.UpkeepStateUpdater,
logger *log.Logger,
) []service.Recoverable {
preprocessors := []ocr2keepersv3.PreProcessor[common.UpkeepPayload]{coord}

// runs full check pipeline on a coordinated block with coordinated upkeeps
conditionalFinal := newFinalConditionalFlow(preprocessors, resultStore, runner, FinalConditionalInterval, proposalQ, builder, retryQ, stateUpdater, logger)
conditionalFinal := newFinalConditionalFlow(preprocessors, resultStore, runner, FinalConditionalInterval, proposalQ, builder, retryQ, logger)

// the sampling proposal flow takes random samples of active upkeeps, checks
// them and surfaces the ids if the items are eligible
Expand Down
26 changes: 0 additions & 26 deletions pkg/v3/flows/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ func TestConditionalTriggerFlows(t *testing.T) {
nil,
nil,
nil,
&mockSubscriber{
SubscribeFn: func() (int, chan common.BlockHistory, error) {
return 0, nil, nil
},
},
nil,
nil,
nil,
Expand All @@ -32,7 +27,6 @@ func TestConditionalTriggerFlows(t *testing.T) {
},
nil,
nil,
nil,
log.New(io.Discard, "", 0),
)
assert.Equal(t, 2, len(flows))
Expand Down Expand Up @@ -69,23 +63,3 @@ type mockRunner struct {
func (r *mockRunner) CheckUpkeeps(ctx context.Context, p ...common.UpkeepPayload) ([]common.CheckResult, error) {
return r.CheckUpkeepsFn(ctx, p...)
}

type mockSubscriber struct {
SubscribeFn func() (int, chan common.BlockHistory, error)
UnsubscribeFn func(int) error
StartFn func(ctx context.Context) error
CloseFn func() error
}

func (r *mockSubscriber) Subscribe() (int, chan common.BlockHistory, error) {
return r.SubscribeFn()
}
func (r *mockSubscriber) Unsubscribe(i int) error {
return r.UnsubscribeFn(i)
}
func (r *mockSubscriber) Start(ctx context.Context) error {
return r.StartFn(ctx)
}
func (r *mockSubscriber) Close() error {
return r.CloseFn()
}
14 changes: 5 additions & 9 deletions pkg/v3/flows/logtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,21 @@ import (
"log"
"time"

common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

var (
ErrNotRetryable = fmt.Errorf("payload is not retryable")
)

const (
// This is the ticker interval for log trigger flow
// LogCheckInterval is the ticker interval for log trigger flow
LogCheckInterval = 1 * time.Second
// Limit for processing a whole observer flow given a payload. The main component of this
// is the checkPipeline which involves some RPC, DB and Mercury calls, this is limited
// to 20 seconds for now
// ObservationProcessLimit is the limit for processing a whole observer flow given a payload. The main component of
// this is the checkPipeline which involves some RPC, DB and Mercury calls, this is limited to 20 seconds for now
ObservationProcessLimit = 20 * time.Second
)

Expand Down
12 changes: 7 additions & 5 deletions pkg/v3/flows/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@ import (

"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"

common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/preprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

const (
// This is the ticker interval for recovery final flow
// RecoveryFinalInterval is the ticker interval for recovery final flow
RecoveryFinalInterval = 1 * time.Second
// These are the maximum number of log upkeeps dequeued on every tick from proposal queue in FinalRecoveryFlow
// This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in every round
// FinalRecoveryBatchSize is the maximum number of log upkeeps dequeued on every tick from proposal queue in
// FinalRecoveryFlow. This is kept same as OutcomeSurfacedProposalsLimit as those many can get enqueued by plugin in
// every round
FinalRecoveryBatchSize = 50
// This is the ticker interval for recovery proposal flow
// RecoveryProposalInterval is the ticker interval for recovery proposal flow
RecoveryProposalInterval = 1 * time.Second
)

Expand Down
9 changes: 5 additions & 4 deletions pkg/v3/flows/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (
"log"
"time"

common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/postprocessors"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/tickers"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

const (
// These are the max number of payloads dequeued on every tick from the retry queue in the retry flow
// RetryBatchSize is the max number of payloads dequeued on every tick from the retry queue in the retry flow
RetryBatchSize = 10
// This is the ticker interval for retry flow
// RetryCheckInterval is the ticker interval for retry flow
RetryCheckInterval = 5 * time.Second
)

Expand Down Expand Up @@ -59,7 +60,7 @@ type retryTick struct {
batchSize int
}

func (t retryTick) Value(ctx context.Context) ([]common.UpkeepPayload, error) {
func (t retryTick) Value(_ context.Context) ([]common.UpkeepPayload, error) {
if t.q == nil {
return nil, nil
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/v3/flows/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink-automation/pkg/v3/service"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/stores"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types/mocks"
common "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

func TestRetryFlow(t *testing.T) {
Expand All @@ -31,22 +32,22 @@ func TestRetryFlow(t *testing.T) {

coord.On("PreProcess", mock.Anything, mock.Anything).Return([]common.UpkeepPayload{
{
UpkeepID: common.UpkeepIdentifier([32]byte{1}),
UpkeepID: [32]byte{1},
WorkID: "0x1",
},
{
UpkeepID: common.UpkeepIdentifier([32]byte{2}),
UpkeepID: [32]byte{2},
WorkID: "0x2",
},
}, nil).Times(times)
runner.On("CheckUpkeeps", mock.Anything, mock.Anything, mock.Anything).Return([]common.CheckResult{
{
UpkeepID: common.UpkeepIdentifier([32]byte{1}),
UpkeepID: [32]byte{1},
WorkID: "0x1",
Eligible: true,
},
{
UpkeepID: common.UpkeepIdentifier([32]byte{2}),
UpkeepID: [32]byte{2},
WorkID: "0x2",
Retryable: true,
},
Expand All @@ -64,12 +65,12 @@ func TestRetryFlow(t *testing.T) {

err := retryQ.Enqueue(types.RetryRecord{
Payload: common.UpkeepPayload{
UpkeepID: common.UpkeepIdentifier([32]byte{1}),
UpkeepID: [32]byte{1},
WorkID: "0x1",
},
}, types.RetryRecord{
Payload: common.UpkeepPayload{
UpkeepID: common.UpkeepIdentifier([32]byte{2}),
UpkeepID: [32]byte{2},
WorkID: "0x2",
},
})
Expand Down
8 changes: 5 additions & 3 deletions pkg/v3/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const (
ObservationPerformablesLimit = 50
ObservationLogRecoveryProposalsLimit = 5
ObservationConditionalsProposalsLimit = 5
ObservationBlockHistoryLimit = 256
// ObservationBlockHistoryLimit is the amount of past blocks required from block history source
ObservationBlockHistoryLimit = 256

// MaxObservationLength applies a limit to the total length of bytes in an
// observation. NOTE: This is derived from a limit of 10000 on performData
Expand Down Expand Up @@ -60,6 +61,7 @@ func DecodeAutomationObservation(data []byte, utg types.UpkeepTypeGetter, wg typ
return ao, nil
}

// validateAutomationObservation validates the automation observation, including block history, performables, proposals, etc
func validateAutomationObservation(o AutomationObservation, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) error {
// Validate Block History
if len(o.BlockHistory) > ObservationBlockHistoryLimit {
Expand Down Expand Up @@ -121,7 +123,7 @@ func validateAutomationObservation(o AutomationObservation, utg types.UpkeepType
return nil
}

// Validates the check result fields sent within an observation
// validateCheckResult validates the check result fields sent within an observation
func validateCheckResult(r ocr2keepers.CheckResult, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) error {
if r.PipelineExecutionState != 0 || r.Retryable {
return fmt.Errorf("check result cannot have failed execution state")
Expand Down Expand Up @@ -169,7 +171,7 @@ func validateUpkeepProposal(p ocr2keepers.CoordinatedBlockProposal, utg types.Up
return nil
}

// Validate validates the trigger fields, and any extensions if present.
// validateTriggerExtensionType validates the trigger fields, and any extensions if present.
func validateTriggerExtensionType(t ocr2keepers.Trigger, ut types.UpkeepType) error {
switch ut {
case types.ConditionTrigger:
Expand Down
12 changes: 6 additions & 6 deletions pkg/v3/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,33 +81,33 @@ func (o *Observer[T]) Process(ctx context.Context, tick tickers.Tick[[]T]) error
defer cancel()

// Get upkeeps from tick
value, err := tick.Value(pCtx)
upkeepPayloads, err := tick.Value(pCtx)
if err != nil {
return err
}

o.lggr.Printf("got %d payloads from ticker", len(value))
o.lggr.Printf("got %d payloads from ticker", len(upkeepPayloads))

// Run pre-processors
for _, preprocessor := range o.Preprocessors {
value, err = preprocessor.PreProcess(pCtx, value)
upkeepPayloads, err = preprocessor.PreProcess(pCtx, upkeepPayloads)
if err != nil {
return err
}
}

o.lggr.Printf("processing %d payloads", len(value))
o.lggr.Printf("processing %d payloads", len(upkeepPayloads))

// Run check pipeline
results, err := o.processFunc(pCtx, value...)
results, err := o.processFunc(pCtx, upkeepPayloads...)
if err != nil {
return err
}

o.lggr.Printf("post-processing %d results", len(results))

// Run post-processor
if err := o.Postprocessor.PostProcess(pCtx, results, value); err != nil {
if err := o.Postprocessor.PostProcess(pCtx, results, upkeepPayloads); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/v3/plugin/hooks/add_log_proposals.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (h *AddLogProposalsHook) RunHook(obs *ocr2keepersv3.AutomationObservation,
proposals = proposals[:limit]
}

// should this be log recovery??
h.logger.Printf("adding %d log recovery proposals to observation", len(proposals))
obs.UpkeepProposals = append(obs.UpkeepProposals, proposals...)
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/v3/plugin/hooks/remove_from_staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
)

// staging is the result store for certain upkeeps
func NewRemoveFromStagingHook(store types.ResultStore, logger *log.Logger) RemoveFromStagingHook {
return RemoveFromStagingHook{
store: store,
Expand Down