Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared job specs #683

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 9 additions & 11 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ type Delegate struct {

legacyChains legacyevm.LegacyChainContainer // legacy: use relayers instead
capabilitiesRegistry types.CapabilitiesRegistry

ccipJobCache *SharedJobCache // share job specs across ReportingPlugins
}

type DelegateConfig interface {
Expand Down Expand Up @@ -269,6 +271,7 @@ func NewDelegate(
isNewlyCreatedJob: false,
mailMon: mailMon,
capabilitiesRegistry: capabilitiesRegistry,
ccipJobCache: NewSharedJobCache(),
}
}

Expand Down Expand Up @@ -331,16 +334,11 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error
if err != nil {
d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec)
}
case types.CCIPCommit:
err = ccipcommit.UnregisterCommitPluginLpFilters(context.Background(), d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec)
}
return nil
case types.CCIPExecution:
err = ccipexec.UnregisterExecPluginLpFilters(context.Background(), d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
case types.CCIPCommit, types.CCIPExecution:
// Matt TODO subscriber must unsubscribe from the cache
err := d.ccipJobCache.deleteJob(context.Background(), jb)
if err != nil {
d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec)
d.lggr.Errorw(fmt.Sprintf("failed to invoke ccip subscriber on %s job deletion", string(types.CCIPCommit)), "err", err, "spec", spec)
}
return nil
default:
Expand Down Expand Up @@ -1753,7 +1751,7 @@ func (d *Delegate) newServicesCCIPCommit(ctx context.Context, lggr logger.Sugare
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccipcommit.NewCommitServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError, qopts...)
return ccipcommit.NewCommitServices(ctx, lggr, jb, d.ccipJobCache, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError, qopts...)
}

func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
Expand Down Expand Up @@ -1806,7 +1804,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccipexec.NewExecutionServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
return ccipexec.NewExecutionServices(ctx, lggr, jb, d.ccipJobCache, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
}

func (d *Delegate) newServicesRebalancer(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig) ([]job.ServiceCtx, error) {
Expand Down
6 changes: 4 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
Expand Down Expand Up @@ -88,11 +89,12 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
sourceNative: rf.config.sourceNative,
onRampReader: rf.config.onRampReader,
commitStoreReader: rf.config.commitStore,
priceGetter: rf.config.priceGetter,
multiLanePriceGetter: rf.config.multiLanePriceGetter,
F: config.F,
lggr: lggr,
destPriceRegistryReader: rf.destPriceRegReader,
offRampReaders: rf.config.offRamps,
curOffRampAddr: rf.config.curOffRampAddr,
allOffRampReaders: rf.config.allOffRampReaders,
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
offchainConfig: pluginOffChainConfig,
metricsCollector: rf.config.metricsCollector,
Expand Down
71 changes: 17 additions & 54 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"math/big"
"strings"

"github.com/Masterminds/semver/v3"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -19,33 +18,31 @@ import (

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider"

"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/commit_store"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/commit_store"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/router"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/factory"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/oraclelib"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/promwrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet, qopts...)
func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, jobCache *ocr2.SharedJobCache, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, jobCache, pr, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,7 +113,7 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb
return multiErr
}

func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, jobCache *ocr2.SharedJobCache, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return nil, nil, nil, err
Expand All @@ -141,44 +138,7 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
}
commitLggr := lggr.Named("CCIPCommit").With("sourceChain", sourceChainName, "destChain", destChainName)

var priceGetter pricegetter.PriceGetter
withPipeline := strings.Trim(params.pluginConfig.TokenPricesUSDPipeline, "\n\t ") != ""
if withPipeline {
priceGetter, err = pricegetter.NewPipelineGetter(params.pluginConfig.TokenPricesUSDPipeline, pr, jb.ID, jb.ExternalJobID, jb.Name.ValueOrZero(), lggr)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating pipeline price getter: %w", err)
}
} else {
// Use dynamic price getter.
if params.pluginConfig.PriceGetterConfig == nil {
return nil, nil, nil, fmt.Errorf("priceGetterConfig is nil")
}

// Build price getter clients for all chains specified in the aggregator configurations.
// Some lanes (e.g. Wemix/Kroma) requires other clients than source and destination, since they use feeds from other chains.
priceGetterClients := map[uint64]pricegetter.DynamicPriceGetterClient{}
for _, aggCfg := range params.pluginConfig.PriceGetterConfig.AggregatorPrices {
chainID := aggCfg.ChainID
// Retrieve the chain.
chain, _, err2 := ccipconfig.GetChainByChainID(chainSet, chainID)
if err2 != nil {
return nil, nil, nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2)
}
caller := rpclib.NewDynamicLimitedBatchCaller(
lggr,
chain.Client(),
rpclib.DefaultRpcBatchSizeLimit,
rpclib.DefaultRpcBatchBackOffMultiplier,
rpclib.DefaultMaxParallelRpcCalls,
)
priceGetterClients[chainID] = pricegetter.NewDynamicPriceGetterClient(caller)
}

priceGetter, err = pricegetter.NewDynamicPriceGetter(*params.pluginConfig.PriceGetterConfig, priceGetterClients)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating dynamic price getter: %w", err)
}
}
multiLanePriceGetter := NewMultiLanePriceGetter(commitLggr, jobCache, pr, chainSet)

// Load all the readers relevant for this plugin.
onrampAddress := cciptypes.Address(params.commitStoreStaticCfg.OnRamp.String())
Expand All @@ -199,6 +159,8 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
if err != nil {
return nil, nil, nil, err
}
// Matt TODO
// Router needs to monitor for OffRamp changes, once detected, reflect that in list of OffRamps
destRouter, err := router.NewRouter(destRouterEvmAddr, params.destChain.Client())
if err != nil {
return nil, nil, nil, err
Expand All @@ -207,7 +169,7 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
if err != nil {
return nil, nil, nil, err
}
var destOffRampReaders []ccipdata.OffRampReader
var allOffRampReaders []ccipdata.OffRampReader
for _, o := range destRouterOffRamps {
destOffRampAddr := cciptypes.Address(o.OffRamp.String())
destOffRampReader, err2 := factory.NewOffRampReader(
Expand All @@ -225,7 +187,7 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
return nil, nil, nil, err2
}

destOffRampReaders = append(destOffRampReaders, destOffRampReader)
allOffRampReaders = append(allOffRampReaders, destOffRampReader)
}

onRampRouterAddr, err := onRampReader.RouterAddress()
Expand All @@ -249,8 +211,8 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
onRampReader = observability.NewObservedOnRampReader(onRampReader, params.sourceChain.ID().Int64(), ccip.CommitPluginLabel)
commitStoreReader = observability.NewObservedCommitStoreReader(commitStoreReader, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
metricsCollector := ccip.NewPluginMetricsCollector(ccip.CommitPluginLabel, params.sourceChain.ID().Int64(), params.destChain.ID().Int64())
for i, o := range destOffRampReaders {
destOffRampReaders[i] = observability.NewObservedOffRampReader(o, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
for i, o := range allOffRampReaders {
allOffRampReaders[i] = observability.NewObservedOffRampReader(o, params.destChain.ID().Int64(), ccip.CommitPluginLabel)
}

chainHealthcheck := cache.NewObservedChainHealthCheck(
Expand Down Expand Up @@ -281,9 +243,10 @@ func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job
return &CommitPluginStaticConfig{
lggr: commitLggr,
onRampReader: onRampReader,
offRamps: destOffRampReaders,
curOffRampAddr: params.pluginConfig.OffRamp,
allOffRampReaders: allOffRampReaders,
sourceNative: ccipcalc.EvmAddrToGeneric(sourceNative),
priceGetter: priceGetter,
multiLanePriceGetter: multiLanePriceGetter,
sourceChainSelector: params.commitStoreStaticCfg.SourceChainSelector,
destChainSelector: params.commitStoreStaticCfg.ChainSelector,
commitStore: commitStoreReader,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package ccipcommit

import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"math/big"
"strings"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2"
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

type MultiLanePriceGetter interface {
GetTokenPrices(ctx context.Context, tokens map[cciptypes.Address][]cciptypes.Address) (map[cciptypes.Address]*big.Int, error)
}

// multiLanePriceGetter is a collection of price getters, one for each lane.
type multiLanePriceGetter struct {
priceGetters map[cciptypes.Address]pricegetter.PriceGetter

jobCache *ocr2.SharedJobCache
pr pipeline.Runner
chainSet legacyevm.LegacyChainContainer

lggr logger.Logger
}

// NewMultiLanePriceGetter creates a new instance of NewMultiLanePriceGetter.
func NewMultiLanePriceGetter(lggr logger.Logger, jobCache *ocr2.SharedJobCache, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer) MultiLanePriceGetter {
return &multiLanePriceGetter{
priceGetters: make(map[cciptypes.Address]pricegetter.PriceGetter),

jobCache: jobCache,
pr: pr,
chainSet: chainSet,

lggr: lggr,
}
}

func (s *multiLanePriceGetter) syncPriceGetters() error {
jobs := s.jobCache.Get()
priceGetters := make(map[cciptypes.Address]pricegetter.PriceGetter)
for _, jb := range jobs {
pluginConfig, err := extractPluginConfig(jb)
if err != nil {
return err
}

offRamp := pluginConfig.OffRamp
// if PriceGetting matching the job's OffRamp already exists, reuse the PriceGetter.
if _, exists := s.priceGetters[offRamp]; exists {
priceGetters[offRamp] = s.priceGetters[offRamp]
continue
}

var priceGetter pricegetter.PriceGetter
withPipeline := strings.Trim(pluginConfig.TokenPricesUSDPipeline, "\n\t ") != ""
if withPipeline {
priceGetter, err = pricegetter.NewPipelineGetter(pluginConfig.TokenPricesUSDPipeline, s.pr, jb.ID, jb.ExternalJobID, jb.Name.ValueOrZero(), s.lggr)
if err != nil {
return fmt.Errorf("creating pipeline price getter: %w", err)
}
} else {
// Use dynamic price getter.
if pluginConfig.PriceGetterConfig == nil {
return fmt.Errorf("priceGetterConfig is nil")
}

// Build price getter clients for all chains specified in the aggregator configurations.
// Some lanes (e.g. Wemix/Kroma) requires other clients than source and destination, since they use feeds from other chains.
priceGetterClients := map[uint64]pricegetter.DynamicPriceGetterClient{}
for _, aggCfg := range pluginConfig.PriceGetterConfig.AggregatorPrices {
chainID := aggCfg.ChainID
// Retrieve the chain.
chain, _, err2 := ccipconfig.GetChainByChainID(s.chainSet, chainID)
if err2 != nil {
return fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2)
}
caller := rpclib.NewDynamicLimitedBatchCaller(
s.lggr,
chain.Client(),
rpclib.DefaultRpcBatchSizeLimit,
rpclib.DefaultRpcBatchBackOffMultiplier,
rpclib.DefaultMaxParallelRpcCalls,
)
priceGetterClients[chainID] = pricegetter.NewDynamicPriceGetterClient(caller)
}

priceGetter, err = pricegetter.NewDynamicPriceGetter(*pluginConfig.PriceGetterConfig, priceGetterClients)
if err != nil {
return fmt.Errorf("creating dynamic price getter: %w", err)
}
}

priceGetters[offRamp] = priceGetter
}

s.priceGetters = priceGetters

return nil
}

func extractPluginConfig(jb job.Job) (*ccipconfig.CommitPluginJobSpecConfig, error) {
if jb.OCR2OracleSpec == nil {
return nil, errors.New("spec is nil")
}
spec := jb.OCR2OracleSpec

var pluginConfig ccipconfig.CommitPluginJobSpecConfig
err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginConfig)
if err != nil {
return nil, err
}
// ensure addresses are formatted properly - (lowercase to eip55 for evm)
pluginConfig.OffRamp = ccipcalc.HexToAddress(string(pluginConfig.OffRamp))

return &pluginConfig, nil
}

// GetTokenPrices looks up token prices from OffRamp's corresponding price getters.
// Matt TODO parallelize this
func (s *multiLanePriceGetter) GetTokenPrices(ctx context.Context, tokensPerOffRamp map[cciptypes.Address][]cciptypes.Address) (map[cciptypes.Address]*big.Int, error) {
if err := s.syncPriceGetters(); err != nil {
return nil, err
}

combinedPrices := make(map[cciptypes.Address]*big.Int)
for offRamp, tokens := range tokensPerOffRamp {
// short circuit if there are no tokens to get prices for on a lane
if len(tokens) == 0 {
continue
}
priceGetter, exists := s.priceGetters[offRamp]
if !exists {
// Matt TODO
// missing offramp means the
continue
}

prices, err := priceGetter.TokenPricesUSD(ctx, tokens)
if err != nil {
return nil, err
}

maps.Copy(combinedPrices, prices)
}

return combinedPrices, nil
}