Skip to content

Commit

Permalink
pkg/loop: plugins report health to host
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Dec 22, 2023
1 parent b074599 commit e443f8f
Show file tree
Hide file tree
Showing 31 changed files with 491 additions and 306 deletions.
7 changes: 7 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ func With(l Logger, keyvals ...interface{}) Logger {

// Named returns a logger with name 'n', if 'l' has a method `Named(string) L`, where L implements Logger, otherwise it returns l.
func Named(l Logger, n string) Logger {
l = named(l, n)
if testing.Testing() {
l.Debugf("New logger: %s", n)
}
return l
}
func named(l Logger, n string) Logger {
switch t := l.(type) {
case *logger:
return t.named(n)
Expand Down
5 changes: 5 additions & 0 deletions pkg/loop/internal/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type pluginMedianServer struct {
}

func RegisterPluginMedianServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.PluginMedian) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterPluginMedianServer(server, newPluginMedianServer(&brokerExt{broker, brokerCfg}, impl))
return nil
}
Expand Down Expand Up @@ -153,6 +154,10 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N
m.closeAll(dsRes, juelsRes, providerRes, errorLogRes)
return nil, err
}
if err = factory.Start(ctx); err != nil {
m.closeAll(dsRes, juelsRes, providerRes, errorLogRes)
return nil, err
}

id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: factory})
Expand Down
28 changes: 21 additions & 7 deletions pkg/loop/internal/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,30 @@ type PluginService[P grpcPlugin, S services.Service] struct {
client *plugin.Client
clientProtocol plugin.ClientProtocol

newService func(context.Context, any) (S, error)
newService func(context.Context, any) (S, services.HealthReporter, error)

serviceCh chan struct{} // closed when service is available
Service S
Health services.HealthReporter //TODO may or may not be the same as Service?

testInterrupt chan func(*PluginService[P, S]) // tests only (via TestHook) to enable access to internals without racing
}

func (s *PluginService[P, S]) Init(pluginName string, p P, newService func(context.Context, any) (S, error), lggr logger.Logger, cmd func() *exec.Cmd, stopCh chan struct{}) {
type NewService[S any] func(context.Context, any) (S, services.HealthReporter, error)

func (s *PluginService[P, S]) Init(
pluginName string,
grpcPlug P,
newService NewService[S],
lggr logger.Logger,
cmd func() *exec.Cmd,
stopCh chan struct{},
) {
s.pluginName = pluginName
s.lggr = lggr
s.cmd = cmd
s.stopCh = stopCh
s.grpcPlug = p
s.grpcPlug = grpcPlug
s.newService = newService
s.serviceCh = make(chan struct{})
}
Expand Down Expand Up @@ -140,7 +150,7 @@ func (s *PluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, e
case <-s.serviceCh:
// s.service already set
default:
s.Service, err = s.newService(ctx, i)
s.Service, s.Health, err = s.newService(ctx, i)
if err != nil {
abort()
return nil, nil, fmt.Errorf("failed to create service: %w", err)
Expand Down Expand Up @@ -173,7 +183,7 @@ func (s *PluginService[P, S]) HealthReport() map[string]error {
select {
case <-s.serviceCh:
hr := map[string]error{s.Name(): s.Healthy()}
services.CopyHealth(hr, s.Service.HealthReport())
services.CopyHealth(hr, s.Health.HealthReport())
return hr
default:
return map[string]error{s.Name(): ErrPluginUnavailable}
Expand All @@ -187,7 +197,7 @@ func (s *PluginService[P, S]) Close() error {

select {
case <-s.serviceCh:
if cerr := s.Service.Close(); !errors.Is(cerr, context.Canceled) && status.Code(cerr) != codes.Canceled {
if cerr := s.Service.Close(); !isCanceled(cerr) {
err = errors.Join(err, cerr)
}
default:
Expand All @@ -199,7 +209,7 @@ func (s *PluginService[P, S]) Close() error {

func (s *PluginService[P, S]) closeClient() (err error) {
if s.clientProtocol != nil {
if cerr := s.clientProtocol.Close(); !errors.Is(cerr, context.Canceled) {
if cerr := s.clientProtocol.Close(); !isCanceled(cerr) {
err = cerr
}
}
Expand Down Expand Up @@ -256,3 +266,7 @@ func (ch TestPluginService[P, S]) Reset() {
}
<-done
}

func isCanceled(err error) bool {
return errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled
}
14 changes: 8 additions & 6 deletions pkg/loop/internal/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ var _ PluginRelayer = (*PluginRelayerClient)(nil)

type PluginRelayerClient struct {
*pluginClient
*serviceClient

grpc pb.PluginRelayerClient
pluginRelayer pb.PluginRelayerClient
}

func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.ClientConn) *PluginRelayerClient {
brokerCfg.Logger = logger.Named(brokerCfg.Logger, "PluginRelayerClient")
pc := newPluginClient(broker, brokerCfg, conn)
return &PluginRelayerClient{pluginClient: pc, grpc: pb.NewPluginRelayerClient(pc)}
return &PluginRelayerClient{pluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)}
}

func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) {
Expand All @@ -42,7 +43,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key
}
deps.Add(ksRes)

reply, err := p.grpc.NewRelayer(ctx, &pb.NewRelayerRequest{
reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{
Config: config,
KeystoreID: id,
})
Expand All @@ -63,6 +64,7 @@ type pluginRelayerServer struct {
}

func RegisterPluginRelayerServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl PluginRelayer) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterPluginRelayerServer(server, newPluginRelayerServer(broker, brokerCfg, impl))
return nil
}
Expand Down Expand Up @@ -163,7 +165,7 @@ type relayerClient struct {
}

func newRelayerClient(b *brokerExt, conn grpc.ClientConnInterface) *relayerClient {
b = b.withName("ChainRelayerClient")
b = b.withName("RelayerClient")
return &relayerClient{b, newServiceClient(b, conn), pb.NewRelayerClient(conn)}
}

Expand Down Expand Up @@ -439,14 +441,14 @@ func (r *relayerServer) Transact(ctx context.Context, request *pb.TransactionReq
return &emptypb.Empty{}, r.impl.Transact(ctx, request.From, request.To, request.Amount.Int(), request.BalanceCheck)
}

func healthReport(s map[string]string) (hr map[string]error) {
func healthReport(prefix string, s map[string]string) (hr map[string]error) {
hr = make(map[string]error, len(s))
for n, e := range s {
var err error
if e != "" {
err = errors.New(e)
}
hr[n] = err
hr[prefix+"."+n] = err
}
return hr
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/loop/internal/reporting_plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type reportingPluginServiceServer struct {
}

func RegisterReportingPluginServiceServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.ReportingPluginClient) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&brokerExt{broker, brokerCfg}, impl))
return nil
}
Expand Down Expand Up @@ -148,6 +149,10 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con
m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes)
return nil, err
}
if err = factory.Start(ctx); err != nil {
m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes)
return nil, err
}

id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: factory})
Expand Down
6 changes: 4 additions & 2 deletions pkg/loop/internal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newServiceClient(b *brokerExt, cc grpc.ClientConnInterface) *serviceClient
}

func (s *serviceClient) Start(ctx context.Context) error {
//TODO reconsider?
return nil // no-op: server side starts automatically
}

Expand Down Expand Up @@ -56,11 +57,12 @@ func (s *serviceClient) HealthReport() map[string]error {
ctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()

name := s.Name()
reply, err := s.grpc.HealthReport(ctx, &emptypb.Empty{})
if err != nil {
return map[string]error{s.b.Logger.Name(): err}
return map[string]error{name: err}
}
hr := healthReport(reply.HealthReport)
hr := healthReport(name, reply.HealthReport)
hr[s.b.Logger.Name()] = nil
return hr
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/loop/internal/test/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginRelayerHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
},
GRPCServer: grpcServer,
})
Expand All @@ -68,7 +68,7 @@ func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginMedianHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
},
GRPCServer: grpcServer,
})
Expand All @@ -89,7 +89,7 @@ func main() {
HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.PluginProvider]{
PluginServer: test.StaticReportingPluginWithPluginProvider{},
PluginServer: test.NewStaticReportingPluginWithPluginProvider(lggr),
BrokerConfig: loop.BrokerConfig{
Logger: lggr,
StopCh: stopCh,
Expand All @@ -105,7 +105,7 @@ func main() {
HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.MedianProvider]{
PluginServer: test.StaticReportingPluginWithMedianProvider{},
PluginServer: test.NewStaticReportingPluginWithMedianProvider(lggr),
BrokerConfig: loop.BrokerConfig{
Logger: lggr,
StopCh: stopCh,
Expand Down
18 changes: 7 additions & 11 deletions pkg/loop/internal/test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ import (
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

type staticConfigProvider struct{}

// TODO validate start/Close calls?
func (s staticConfigProvider) Start(ctx context.Context) error { return nil }

func (s staticConfigProvider) Close() error { return nil }

func (s staticConfigProvider) Ready() error { panic("unimplemented") }

func (s staticConfigProvider) Name() string { panic("unimplemented") }
type staticConfigProvider struct {
staticService
}

func (s staticConfigProvider) HealthReport() map[string]error { panic("unimplemented") }
func newStaticConfigProvider(lggr logger.Logger) staticConfigProvider {
return staticConfigProvider{staticService{lggr: logger.Named(lggr, "staticConfigProvider")}}
}

func (s staticConfigProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
Expand Down

0 comments on commit e443f8f

Please sign in to comment.