Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Typed capability API #462

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/hashicorp/consul/sdk v0.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.6.0
github.com/invopop/jsonschema v0.7.0
github.com/jmoiron/sqlx v1.3.5
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
Expand All @@ -23,6 +24,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/riferrei/srclient v0.5.4
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -55,6 +57,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand All @@ -65,7 +68,6 @@ require (
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE
github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 h1:i462o439ZjprVSFSZLZxcsoAe592sZB1rci2Z8j4wdk=
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/invopop/jsonschema v0.7.0 h1:2vgQcBz1n256N+FpX3Jq7Y17AjYt46Ig3zIWyy770So=
github.com/invopop/jsonschema v0.7.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
Expand Down Expand Up @@ -288,6 +292,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
164 changes: 164 additions & 0 deletions pkg/capabilities/v2/capability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package v2

import (
"context"
"encoding/json"

"github.com/invopop/jsonschema"
jsonvalidate "github.com/santhosh-tekuri/jsonschema/v5"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

type CapabilityResponse[O any] struct {
Value O
Err error
}

type CapabilityRequest[I, C any] struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do those work in a remote setting? Does the receiver always know what type to expect or is there some type info encoded too?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine it's the same, capabilities should have a input/config/output schema they abide to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolekk This is purely a client-side feature (i.e. inside the capability), anything accessing the capability (either via the Remote API or via the workflow engine would continue to do so via the lower-level, values-based API.

Metadata capabilities.RequestMetadata
Config C
Inputs I
}

type RegisterToWorkflowRequest[C any] struct {
Metadata capabilities.RegistrationMetadata
Config C
}

type UnregisterFromWorkflowRequest[C any] struct {
Metadata capabilities.RegistrationMetadata
Config C
}

type Capability[I, O, C any] interface {
RegisterToWorkflow(ctx context.Context, req RegisterToWorkflowRequest[C]) error
UnregisterFromWorkflow(ctx context.Context, req UnregisterFromWorkflowRequest[C]) error
Execute(ctx context.Context, callback chan<- CapabilityResponse[O], request CapabilityRequest[I, C]) error
}

type capability[I, O, C any] struct {
inner Capability[I, O, C]
}

func (c *capability[I, O, C]) RegisterToWorkflow(ctx context.Context, req capabilities.RegisterToWorkflowRequest) error {
var conf C
err := c.validate(&conf, req.Config)
if err != nil {
return err
}

err = req.Config.UnwrapTo(&conf)
if err != nil {
return err
}
regReq := RegisterToWorkflowRequest[C]{
Metadata: req.Metadata,
Config: conf,
}
return c.inner.RegisterToWorkflow(ctx, regReq)
}

func (c *capability[I, O, C]) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error {
var conf C
err := c.validate(&conf, req.Config)
if err != nil {
return err
}

err = req.Config.UnwrapTo(&conf)
if err != nil {
return err
}
regReq := UnregisterFromWorkflowRequest[C]{
Metadata: req.Metadata,
Config: conf,
}
return c.inner.UnregisterFromWorkflow(ctx, regReq)
}

func (c *capability[I, O, C]) validate(str any, m *values.Map) error {
sch := jsonschema.Reflect(str)
schemab, err := json.Marshal(sch)
if err != nil {
return err
}

mapping, err := values.Unwrap(m)
if err != nil {
return err
}

schema, err := jsonvalidate.CompileString("<uriPrefix>", string(schemab))
if err != nil {
return err
}
return schema.Validate(mapping)
}

func (c *capability[I, O, C]) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, request capabilities.CapabilityRequest) error {
tcb := make(chan CapabilityResponse[O])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does tcb mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean obviously it stands for Typed Capability Response :P

go c.forwardResponses(ctx, callback, tcb)

var conf C
err := c.validate(&conf, request.Config)
if err != nil {
return err
}

err = request.Config.UnwrapTo(&conf)
if err != nil {
return err
}

var inp I
err = c.validate(&inp, request.Inputs)
if err != nil {
return err
}

err = request.Inputs.UnwrapTo(&inp)
if err != nil {
return err
}

treq := CapabilityRequest[I, C]{
Metadata: request.Metadata,
Config: conf,
Inputs: inp,
}

return c.inner.Execute(ctx, tcb, treq)
}

func (c *capability[I, O, C]) forwardResponses(ctx context.Context, callback chan<- capabilities.CapabilityResponse, typedCallback chan CapabilityResponse[O]) {
for {
select {
case <-ctx.Done():
return
case resp, isOpen := <-typedCallback:
if !isOpen {
close(callback)
return
}

v, err := values.Wrap(resp.Value)
if err != nil {
callback <- capabilities.CapabilityResponse{
Err: err,
}
}

callback <- capabilities.CapabilityResponse{
Value: v,
}
}
}
}

func NewCapability[I, O, C any](cap Capability[I, O, C]) capabilities.CallbackExecutable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, and is the direction I wanted to take the Validation API efforts to :)
What we're building represents a RPC framework of sorts.

I'd imagine there's a lot of value to be had by allowing the user to inject middlewares that are called during request/response lifecycle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed :) We could use this for observability and logging for example :)

return &capability[I, O, C]{
inner: cap,
}
}
64 changes: 64 additions & 0 deletions pkg/capabilities/v2/capability_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package v2

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

type typedCapability struct {
Capability[AnInput, AnOutput, AConfig]
inp AnInput
conf AConfig
}

type AnInput struct {
Foo string `json:"foo"`
}

type AConfig struct {
Bar string `json:"bar"`
}

type AnOutput struct {
Baz string `json:"baz"`
}

func (t *typedCapability) Execute(ctx context.Context, callback chan<- CapabilityResponse[AnOutput], req CapabilityRequest[AnInput, AConfig]) error {
t.inp = req.Inputs
t.conf = req.Config
return nil
}

func TestCapabilityV2_Execute(t *testing.T) {
c := &typedCapability{}
cap := NewCapability(c)

cb := make(chan capabilities.CapabilityResponse)

conf, err := values.NewMap(map[string]any{
"bar": "config-string",
})
require.NoError(t, err)

inp, err := values.NewMap(map[string]any{
"foo": "input-string",
})
require.NoError(t, err)

req := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{},
Config: conf,
Inputs: inp,
}
err = cap.Execute(context.Background(), cb, req)
require.NoError(t, err)

assert.Equal(t, AnInput{Foo: "input-string"}, c.inp)
assert.Equal(t, AConfig{Bar: "config-string"}, c.conf)
}