Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: brokers can be added to event lineage graph #7731

Merged
merged 2 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 57 additions & 0 deletions pkg/graph/constructor.go
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package graph

import (
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

func (g *Graph) AddBroker(broker eventingv1.Broker) {
ref := &duckv1.KReference{
Name: broker.Name,
Namespace: broker.Namespace,
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
}
dest := &duckv1.Destination{Ref: ref}

// check if this vertex already exists
v, ok := g.vertices[makeComparableDestination(dest)]
if !ok {
v = &Vertex{
self: dest,
}
g.vertices[makeComparableDestination(dest)] = v
}

if broker.Spec.Delivery == nil || broker.Spec.Delivery.DeadLetterSink == nil {
// no DLS, we are done
return
}

// broker has a DLS, we need to add an edge to that
to, ok := g.vertices[makeComparableDestination(broker.Spec.Delivery.DeadLetterSink)]
if !ok {
to = &Vertex{
self: broker.Spec.Delivery.DeadLetterSink,
}
g.vertices[makeComparableDestination(broker.Spec.Delivery.DeadLetterSink)] = to
}

v.AddEdge(to, dest, NoTransform)
}
159 changes: 159 additions & 0 deletions pkg/graph/constructor_test.go
@@ -0,0 +1,159 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go imports problem

Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package graph

import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

var (
sampleUri, _ = apis.ParseURL("https://knative.dev")
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
)

func TestAddBroker(t *testing.T) {
brokerWithEdge := &Vertex{
self: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
},
}
destinationWithEdge := &Vertex{
self: &duckv1.Destination{
URI: sampleUri,
},
}
brokerWithEdge.AddEdge(destinationWithEdge, &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
}, NoTransform)
tests := []struct {
name string
broker eventingv1.Broker
expected map[comparableDestination]*Vertex
}{
{
name: "no DLS",
broker: eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "my-broker",
Namespace: "default",
},
},
expected: map[comparableDestination]*Vertex{
{
Ref: duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
}: {
self: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
},
},
},
}, {
name: "DLS",
broker: eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "my-broker",
Namespace: "default",
},
Spec: eventingv1.BrokerSpec{
Delivery: &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sampleUri,
},
},
},
},
expected: map[comparableDestination]*Vertex{
{
Ref: duckv1.KReference{
Name: "my-broker",
Namespace: "default",
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
},
}: brokerWithEdge,
{
URI: *sampleUri,
}: destinationWithEdge,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := NewGraph()
g.AddBroker(test.broker)
assert.Len(t, g.vertices, len(test.expected))
for k, expected := range test.expected {
actual, ok := g.vertices[k]
assert.True(t, ok)
// assert.Equal can't do equality on function values, which edges have, so we need to do a more complicated check
assert.Equal(t, actual.self, expected.self)
assert.Len(t, actual.inEdges, len(expected.inEdges))
assert.Subset(t, makeComparableEdges(actual.inEdges), makeComparableEdges(expected.inEdges))
assert.Len(t, actual.outEdges, len(expected.outEdges))
assert.Subset(t, makeComparableEdges(actual.outEdges), makeComparableEdges(expected.outEdges))
}
})
}
}

func makeComparableEdges(edges []*Edge) []comparableEdge {
res := make([]comparableEdge, len(edges))

for _, e := range edges {
res = append(res, comparableEdge{
self: makeComparableDestination(e.self),
to: makeComparableDestination(e.to.self),
from: makeComparableDestination(e.from.self),
})
}

return res
}

type comparableEdge struct {
self comparableDestination
to comparableDestination
from comparableDestination
}
41 changes: 38 additions & 3 deletions pkg/graph/types.go
Expand Up @@ -18,11 +18,12 @@ package graph

import (
eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type Graph struct {
vertices []*Vertex
vertices map[comparableDestination]*Vertex
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

type Vertex struct {
Expand All @@ -39,15 +40,36 @@ type Edge struct {
to *Vertex
}

// comparableDestination is a modified version of duckv1.Destination that is comparable (no pointers).
// It also omits some fields not needed for event lineage.
type comparableDestination struct {
// Ref points to an Addressable.
// +optional
Ref duckv1.KReference `json:"ref,omitempty"`

// URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
// +optional
URI apis.URL `json:"uri,omitempty"`
}
type TransformFunction func(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext)

// TODO(cali0707): flesh this out more, know we need it, not sure what needs to be in it yet
type TransformFunctionContext struct{}

func (t TransformFunctionContext) DeepCopy() TransformFunctionContext { return t } // TODO(cali0707) implement this once we have fleshed out the transform function context struct

func NewGraph() *Graph {
return &Graph{
vertices: make(map[comparableDestination]*Vertex),
}
}

func (g *Graph) Vertices() []*Vertex {
return g.vertices
vertices := make([]*Vertex, len(g.vertices))
for _, v := range g.vertices {
vertices = append(vertices, v)
}
return vertices
Cali0707 marked this conversation as resolved.
Show resolved Hide resolved
}

func (g *Graph) UnvisitAll() {
Expand Down Expand Up @@ -95,7 +117,9 @@ func (v *Vertex) NewWithSameRef() *Vertex {
}

func (v *Vertex) AddEdge(to *Vertex, edgeRef *duckv1.Destination, transform TransformFunction) {
v.outEdges = append(v.outEdges, &Edge{from: v, to: to, transform: transform, self: edgeRef})
edge := &Edge{from: v, to: to, transform: transform, self: edgeRef}
v.outEdges = append(v.outEdges, edge)
to.inEdges = append(to.inEdges, edge)

}

Expand All @@ -122,3 +146,14 @@ func (e *Edge) Reference() *duckv1.Destination {
func NoTransform(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext) {
return et, tfc
}

func makeComparableDestination(dest *duckv1.Destination) comparableDestination {
res := comparableDestination{}
if dest.Ref != nil {
res.Ref = *dest.Ref
}
if dest.URI != nil {
res.URI = *dest.URI
}
return res
}