Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offchain - remove dependency on pg.qopts #727

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/young-rockets-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"ccip": patch
---

Remove dependency on pg.qopts.

Postgres opts are not supported anymore. We are replacing all the `pg.qopts` params
with `context.Context`, it should contain all the relevant information for the underlying implementations.
12 changes: 6 additions & 6 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,13 @@ func (d *Delegate) cleanupEVM(ctx context.Context, jb job.Job, q pg.Queryer, rel
d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec)
}
case types.CCIPCommit:
err = ccipcommit.UnregisterCommitPluginLpFilters(context.Background(), d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
err = ccipcommit.UnregisterCommitPluginLpFilters(ctx, d.lggr, jb, d.legacyChains)
if err != nil {
d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec)
}
return nil
case types.CCIPExecution:
err = ccipexec.UnregisterExecPluginLpFilters(context.Background(), d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
err = ccipexec.UnregisterExecPluginLpFilters(ctx, d.lggr, jb, d.legacyChains)
if err != nil {
d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec)
}
Expand Down Expand Up @@ -1707,7 +1707,7 @@ func (d *Delegate) newServicesOCR2Functions(
return append([]job.ServiceCtx{functionsProvider, thresholdProvider, s4Provider}, functionsServices...), nil
}

func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
if spec.Relay != relay.EVM {
return nil, errors.New("Non evm chains are not supported for CCIP commit")
Expand Down Expand Up @@ -1758,10 +1758,10 @@ func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.Sugare
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccipcommit.NewCommitServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError, qopts...)
return ccipcommit.NewCommitServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError)
}

func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
if spec.Relay != relay.EVM {
return nil, errors.New("Non evm chains are not supported for CCIP execution")
Expand Down Expand Up @@ -1811,7 +1811,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccipexec.NewExecutionServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
return ccipexec.NewExecutionServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError)
}

func (d *Delegate) newServicesRebalancer(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig) ([]job.ServiceCtx, error) {
Expand Down
23 changes: 11 additions & 12 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/oraclelib"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/promwrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet, qopts...)
func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,21 +88,21 @@ func CommitReportToEthTxMeta(typ ccipconfig.ContractType, ver semver.Version) (f
// https://github.com/smartcontractkit/ccip/blob/68e2197472fb017dd4e5630d21e7878d58bc2a44/core/services/feeds/service.go#L716
// TODO once that transaction is broken up, we should be able to simply rely on oracle.Close() to cleanup the filters.
// Until then we have to deterministically reload the readers from the spec (and thus their filters) and close them.
func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) error {
func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer) error {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return err
}
versionFinder := factory.NewEvmVersionFinder()
unregisterFuncs := []func() error{
func() error {
return factory.CloseCommitStoreReader(lggr, versionFinder, params.commitStoreAddress, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), params.sourceChain.Config().EVM().GasEstimator().PriceMax().ToInt(), qopts...)
return factory.CloseCommitStoreReader(ctx, lggr, versionFinder, params.commitStoreAddress, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), params.sourceChain.Config().EVM().GasEstimator().PriceMax().ToInt())
},
func() error {
return factory.CloseOnRampReader(lggr, versionFinder, params.commitStoreStaticCfg.SourceChainSelector, params.commitStoreStaticCfg.ChainSelector, cciptypes.Address(params.commitStoreStaticCfg.OnRamp.String()), params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...)
return factory.CloseOnRampReader(ctx, lggr, versionFinder, params.commitStoreStaticCfg.SourceChainSelector, params.commitStoreStaticCfg.ChainSelector, cciptypes.Address(params.commitStoreStaticCfg.OnRamp.String()), params.sourceChain.LogPoller(), params.sourceChain.Client())
},
func() error {
return factory.CloseOffRampReader(lggr, versionFinder, params.pluginConfig.OffRamp, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt(), qopts...)
return factory.CloseOffRampReader(ctx, lggr, versionFinder, params.pluginConfig.OffRamp, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt())
},
}

Expand All @@ -116,7 +115,7 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb
return multiErr
}

func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return nil, nil, nil, err
Expand All @@ -131,7 +130,7 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
"DestChainSelector", params.commitStoreStaticCfg.ChainSelector)

versionFinder := factory.NewEvmVersionFinder()
commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, params.commitStoreAddress, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), params.sourceChain.Config().EVM().GasEstimator().PriceMax().ToInt(), qopts...)
commitStoreReader, err := factory.NewCommitStoreReader(ctx, lggr, versionFinder, params.commitStoreAddress, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), params.sourceChain.Config().EVM().GasEstimator().PriceMax().ToInt())
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not create commitStore reader")
}
Expand Down Expand Up @@ -182,11 +181,11 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job

// Load all the readers relevant for this plugin.
onrampAddress := cciptypes.Address(params.commitStoreStaticCfg.OnRamp.String())
onRampReader, err := factory.NewOnRampReader(commitLggr, versionFinder, params.commitStoreStaticCfg.SourceChainSelector, params.commitStoreStaticCfg.ChainSelector, onrampAddress, params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...)
onRampReader, err := factory.NewOnRampReader(ctx, commitLggr, versionFinder, params.commitStoreStaticCfg.SourceChainSelector, params.commitStoreStaticCfg.ChainSelector, onrampAddress, params.sourceChain.LogPoller(), params.sourceChain.Client())
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed onramp reader")
}
offRampReader, err := factory.NewOffRampReader(commitLggr, versionFinder, params.pluginConfig.OffRamp, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt(), true, qopts...)
offRampReader, err := factory.NewOffRampReader(ctx, commitLggr, versionFinder, params.pluginConfig.OffRamp, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt(), true)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed offramp reader")
}
Expand All @@ -211,6 +210,7 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
for _, o := range destRouterOffRamps {
destOffRampAddr := cciptypes.Address(o.OffRamp.String())
destOffRampReader, err2 := factory.NewOffRampReader(
ctx,
commitLggr,
versionFinder,
destOffRampAddr,
Expand All @@ -219,7 +219,6 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
params.destChain.GasEstimator(),
params.destChain.Config().EVM().GasEstimator().PriceMax().ToInt(),
true,
qopts...,
)
if err2 != nil {
return nil, nil, nil, err2
Expand Down