Skip to content

Commit

Permalink
feat: add 'kn func emit' command (#332)
Browse files Browse the repository at this point in the history
This commit adds an Emitter to be used by the CLI commands
for sending CloudEvents to functions, either locally, on
the cluster, or at a specified endpoint.

Signed-off-by: Lance Ball <lball@redhat.com>
  • Loading branch information
lance committed May 13, 2021
1 parent a74e3dd commit 49594d9
Show file tree
Hide file tree
Showing 8 changed files with 455 additions and 0 deletions.
24 changes: 24 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Client struct {
templates string // path to extensible templates
registry string // default registry for OCI image tags
progressListener ProgressListener // progress listener
emitter Emitter // Emits CloudEvents to functions
}

// ErrNotBuilt indicates the Function has not yet been built.
Expand Down Expand Up @@ -137,6 +138,11 @@ type DNSProvider interface {
Provide(Function) error
}

// Emit CloudEvents to functions
type Emitter interface {
Emit(ctx context.Context, endpoint string) error
}

// New client for Function management.
func New(options ...Option) *Client {
// Instantiate client with static defaults.
Expand All @@ -149,6 +155,7 @@ func New(options ...Option) *Client {
lister: &noopLister{output: os.Stdout},
dnsProvider: &noopDNSProvider{output: os.Stdout},
progressListener: &noopProgressListener{},
emitter: &noopEmitter{},
}

// Apply passed options, which take ultimate precidence.
Expand Down Expand Up @@ -254,6 +261,14 @@ func WithRegistry(registry string) Option {
}
}

// WithEmitter sets a CloudEvent emitter on the client which is capable of sending
// a CloudEvent to an arbitrary function endpoint
func WithEmitter(e Emitter) Option {
return func(c *Client) {
c.emitter = e
}
}

// New Function.
// Use Create, Build and Deploy independently for lower level control.
func (c *Client) New(ctx context.Context, cfg Function) (err error) {
Expand Down Expand Up @@ -529,6 +544,11 @@ func (c *Client) Remove(ctx context.Context, cfg Function) error {
return c.remover.Remove(ctx, f.Name)
}

// Emit a CloudEvent to a function endpoint
func (c *Client) Emit(ctx context.Context, endpoint string) error {
return c.emitter.Emit(ctx, endpoint)
}

// Manual implementations (noops) of required interfaces.
// In practice, the user of this client package (for example the CLI) will
// provide a concrete implementation for all of the interfaces. For testing or
Expand Down Expand Up @@ -573,3 +593,7 @@ func (p *noopProgressListener) SetTotal(i int) {}
func (p *noopProgressListener) Increment(m string) {}
func (p *noopProgressListener) Complete(m string) {}
func (p *noopProgressListener) Done() {}

type noopEmitter struct{}

func (p *noopEmitter) Emit(ctx context.Context, endpoint string) error { return nil }
24 changes: 24 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,30 @@ func TestDeployUnbuilt(t *testing.T) {
}
}

func TestEmit(t *testing.T) {
sink := "http://testy.mctestface.com"
emitter := mock.NewEmitter()

// Ensure sink passthrough from client
emitter.EmitFn = func(s string) error {
if s != sink {
t.Fatalf("Unexpected sink %v\n", s)
}
return nil
}

// Instantiate in the current working directory, with no name.
client := bosonFunc.New(bosonFunc.WithEmitter(emitter))

if err := client.Emit(context.Background(), sink); err != nil {
t.Fatal(err)
}
if !emitter.EmitInvoked {
t.Fatal("Client did not invoke emitter.Emit()")
}

}

// TODO: The tests which confirm an error is generated do not currently test
// that the expected error is received; just that any error is generated.
// This should be replaced with typed errors or at a minimum code prefixes
Expand Down
66 changes: 66 additions & 0 deletions cloudevents/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cloudevents

import (
"context"
"fmt"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"
)

const (
DefaultSource = "/boson/fn"
DefaultType = "boson.fn"
)

type Emitter struct {
Endpoint string
Source string
Type string
Id string
Data string
ContentType string
}

func NewEmitter() *Emitter {
return &Emitter{
Source: DefaultSource,
Type: DefaultType,
Id: uuid.NewString(),
Data: "",
ContentType: event.TextPlain,
}
}

func (e *Emitter) Emit(ctx context.Context, endpoint string) (err error) {
c, err := newClient(endpoint)
if err != nil {
return
}
evt := event.Event{
Context: event.EventContextV1{
Type: e.Type,
Source: *types.ParseURIRef(e.Source),
ID: e.Id,
}.AsV1(),
}
if err = evt.SetData(e.ContentType, e.Data); err != nil {
return
}
if result := c.Send(ctx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf(result.Error())
}
return nil
}

func newClient(target string) (c client.Client, err error) {
p, err := http.New(http.WithTarget(target))
if err != nil {
return
}
return client.New(p)
}
140 changes: 140 additions & 0 deletions cloudevents/emitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package cloudevents

import (
"context"
"fmt"
"testing"
"time"

"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-cmp/cmp"
)

func makeClient(t *testing.T) (c client.Client, p *http.Protocol) {
p, err := http.New()
if err != nil {
t.Fatal(err)
}
c, err = client.New(p)
if err != nil {
t.Errorf("failed to make client %s", err.Error())
}
return
}

func receiveEvents(t *testing.T, ctx context.Context, events chan<- event.Event) (p *http.Protocol) {
c, p := makeClient(t)
go func() {
err := c.StartReceiver(ctx, func(ctx context.Context, event event.Event) error {
go func() {
events <- event
}()
return nil
})
if err != nil {
t.Errorf("failed to start receiver %s", err.Error())
}
}()
time.Sleep(1 * time.Second) // let the server start
return
}

func TestEmitterDefaults(t *testing.T) {
events := make(chan event.Event)
ctx, cancel := context.WithCancel(context.Background())

// start a cloudevent client that receives events
// and sends them to a channel
p := receiveEvents(t, ctx, events)

emitter := NewEmitter()
if err := emitter.Emit(ctx, fmt.Sprintf("http://localhost:%v", p.GetListeningPort())); err != nil {
t.Fatalf("Error emitting event: %v\n", err)
}

// received event
got := <-events

cancel() // stop the client
time.Sleep(1 * time.Second) // let the server stop

if got.Source() != "/boson/fn" {
t.Fatal("Expected /boson/fn as default source")
}
if got.Type() != "boson.fn" {
t.Fatal("Expected boson.fn as default type")
}
}

func TestEmitter(t *testing.T) {
testCases := map[string]struct {
cesource string
cetype string
ceid string
cedata string
}{
"with-source": {
cesource: "/my/source",
},
"with-type": {
cetype: "my.type",
},
"with-id": {
ceid: "11223344",
},
"with-data": {
cedata: "Some event data",
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
events := make(chan event.Event)
ctx, cancel := context.WithCancel(context.Background())

// start a cloudevent client that receives events
// and sends them to a channel
p := receiveEvents(t, ctx, events)

emitter := NewEmitter()

if tc.cesource != "" {
emitter.Source = tc.cesource
}
if tc.cetype != "" {
emitter.Type = tc.cetype
}
if tc.ceid != "" {
emitter.Id = tc.ceid
}
if tc.cedata != "" {
emitter.Data = tc.cedata
}
if err := emitter.Emit(ctx, fmt.Sprintf("http://localhost:%v", p.GetListeningPort())); err != nil {
t.Fatalf("Error emitting event: %v\n", err)
}

// received event
got := <-events

cancel() // stop the client
time.Sleep(100 * time.Millisecond) // let the server stop

if tc.cesource != "" && got.Source() != tc.cesource {
t.Fatalf("%s: Expected %s as source, got %s", n, tc.cesource, got.Source())
}
if tc.cetype != "" && got.Type() != tc.cetype {
t.Fatalf("%s: Expected %s as type, got %s", n, tc.cetype, got.Type())
}
if tc.ceid != "" && got.ID() != tc.ceid {
t.Fatalf("%s: Expected %s as id, got %s", n, tc.ceid, got.ID())
}
if tc.cedata != "" {
if diff := cmp.Diff(tc.cedata, string(got.Data())); diff != "" {
t.Errorf("Unexpected difference (-want, +got): %v", diff)
}
}
})
}
}

0 comments on commit 49594d9

Please sign in to comment.