Skip to content

Commit

Permalink
feat: brokers can be added to event lineage graph (#7731)
Browse files Browse the repository at this point in the history
* feat: brokers can be added to event lineage graph

Signed-off-by: Calum Murray <cmurray@redhat.com>

* cleanup: fix imports

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Mar 5, 2024
1 parent 6e7728e commit 95996a0
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 3 deletions.
57 changes: 57 additions & 0 deletions pkg/graph/constructor.go
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package graph

import (
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func (g *Graph) AddBroker(broker eventingv1.Broker) {
ref := &duckv1.KReference{
Name: broker.Name,
Namespace: broker.Namespace,
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
}
dest := &duckv1.Destination{Ref: ref}

// check if this vertex already exists
v, ok := g.vertices[makeComparableDestination(dest)]
if !ok {
v = &Vertex{
self: dest,
}
g.vertices[makeComparableDestination(dest)] = v
}

if broker.Spec.Delivery == nil || broker.Spec.Delivery.DeadLetterSink == nil {
// no DLS, we are done
return
}

// broker has a DLS, we need to add an edge to that
to, ok := g.vertices[makeComparableDestination(broker.Spec.Delivery.DeadLetterSink)]
if !ok {
to = &Vertex{
self: broker.Spec.Delivery.DeadLetterSink,
}
g.vertices[makeComparableDestination(broker.Spec.Delivery.DeadLetterSink)] = to
}

v.AddEdge(to, dest, NoTransform)
}
159 changes: 159 additions & 0 deletions pkg/graph/constructor_test.go
@@ -0,0 +1,159 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package graph

import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

var (
sampleUri, _ = apis.ParseURL("https://knative.dev")
)

func TestAddBroker(t *testing.T) {
brokerWithEdge := &Vertex{
self: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
},
}
destinationWithEdge := &Vertex{
self: &duckv1.Destination{
URI: sampleUri,
},
}
brokerWithEdge.AddEdge(destinationWithEdge, &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
}, NoTransform)
tests := []struct {
name string
broker eventingv1.Broker
expected map[comparableDestination]*Vertex
}{
{
name: "no DLS",
broker: eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "my-broker",
Namespace: "default",
},
},
expected: map[comparableDestination]*Vertex{
{
Ref: duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
}: {
self: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
},
},
},
}, {
name: "DLS",
broker: eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "my-broker",
Namespace: "default",
},
Spec: eventingv1.BrokerSpec{
Delivery: &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sampleUri,
},
},
},
},
expected: map[comparableDestination]*Vertex{
{
Ref: duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
}: brokerWithEdge,
{
URI: *sampleUri,
}: destinationWithEdge,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := NewGraph()
g.AddBroker(test.broker)
assert.Len(t, g.vertices, len(test.expected))
for k, expected := range test.expected {
actual, ok := g.vertices[k]
assert.True(t, ok)
// assert.Equal can't do equality on function values, which edges have, so we need to do a more complicated check
assert.Equal(t, actual.self, expected.self)
assert.Len(t, actual.inEdges, len(expected.inEdges))
assert.Subset(t, makeComparableEdges(actual.inEdges), makeComparableEdges(expected.inEdges))
assert.Len(t, actual.outEdges, len(expected.outEdges))
assert.Subset(t, makeComparableEdges(actual.outEdges), makeComparableEdges(expected.outEdges))
}
})
}
}

func makeComparableEdges(edges []*Edge) []comparableEdge {
res := make([]comparableEdge, len(edges))

for _, e := range edges {
res = append(res, comparableEdge{
self: makeComparableDestination(e.self),
to: makeComparableDestination(e.to.self),
from: makeComparableDestination(e.from.self),
})
}

return res
}

type comparableEdge struct {
self comparableDestination
to comparableDestination
from comparableDestination
}
41 changes: 38 additions & 3 deletions pkg/graph/types.go
Expand Up @@ -18,11 +18,12 @@ package graph

import (
eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type Graph struct {
vertices []*Vertex
vertices map[comparableDestination]*Vertex
}

type Vertex struct {
Expand All @@ -39,15 +40,36 @@ type Edge struct {
to *Vertex
}

// comparableDestination is a modified version of duckv1.Destination that is comparable (no pointers).
// It also omits some fields not needed for event lineage.
type comparableDestination struct {
// Ref points to an Addressable.
// +optional
Ref duckv1.KReference `json:"ref,omitempty"`

// URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
// +optional
URI apis.URL `json:"uri,omitempty"`
}
type TransformFunction func(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext)

// TODO(cali0707): flesh this out more, know we need it, not sure what needs to be in it yet
type TransformFunctionContext struct{}

func (t TransformFunctionContext) DeepCopy() TransformFunctionContext { return t } // TODO(cali0707) implement this once we have fleshed out the transform function context struct

func NewGraph() *Graph {
return &Graph{
vertices: make(map[comparableDestination]*Vertex),
}
}

func (g *Graph) Vertices() []*Vertex {
return g.vertices
vertices := make([]*Vertex, len(g.vertices))
for _, v := range g.vertices {
vertices = append(vertices, v)
}
return vertices
}

func (g *Graph) UnvisitAll() {
Expand Down Expand Up @@ -95,7 +117,9 @@ func (v *Vertex) NewWithSameRef() *Vertex {
}

func (v *Vertex) AddEdge(to *Vertex, edgeRef *duckv1.Destination, transform TransformFunction) {
v.outEdges = append(v.outEdges, &Edge{from: v, to: to, transform: transform, self: edgeRef})
edge := &Edge{from: v, to: to, transform: transform, self: edgeRef}
v.outEdges = append(v.outEdges, edge)
to.inEdges = append(to.inEdges, edge)

}

Expand All @@ -122,3 +146,14 @@ func (e *Edge) Reference() *duckv1.Destination {
func NoTransform(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext) {
return et, tfc
}

func makeComparableDestination(dest *duckv1.Destination) comparableDestination {
res := comparableDestination{}
if dest.Ref != nil {
res.Ref = *dest.Ref
}
if dest.URI != nil {
res.URI = *dest.URI
}
return res
}

0 comments on commit 95996a0

Please sign in to comment.