From 5c5cf26b1604008256a56645c2c810c5cd561a57 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 27 Feb 2024 15:47:29 -0500 Subject: [PATCH 1/2] feat: brokers can be added to event lineage graph Signed-off-by: Calum Murray --- pkg/graph/constructor.go | 57 ++++++++++++ pkg/graph/constructor_test.go | 159 ++++++++++++++++++++++++++++++++++ pkg/graph/types.go | 41 ++++++++- 3 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 pkg/graph/constructor.go create mode 100644 pkg/graph/constructor_test.go diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go new file mode 100644 index 00000000000..57f3f5be161 --- /dev/null +++ b/pkg/graph/constructor.go @@ -0,0 +1,57 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graph + +import ( + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func (g *Graph) AddBroker(broker eventingv1.Broker) { + ref := &duckv1.KReference{ + Name: broker.Name, + Namespace: broker.Namespace, + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + } + dest := &duckv1.Destination{Ref: ref} + + // check if this vertex already exists + v, ok := g.vertices[makeComparableDestination(dest)] + if !ok { + v = &Vertex{ + self: dest, + } + g.vertices[makeComparableDestination(dest)] = v + } + + if broker.Spec.Delivery == nil || broker.Spec.Delivery.DeadLetterSink == nil { + // no DLS, we are done + return + } + + // broker has a DLS, we need to add an edge to that + to, ok := g.vertices[makeComparableDestination(broker.Spec.Delivery.DeadLetterSink)] + if !ok { + to = &Vertex{ + self: broker.Spec.Delivery.DeadLetterSink, + } + g.vertices[makeComparableDestination(broker.Spec.Delivery.DeadLetterSink)] = to + } + + v.AddEdge(to, dest, NoTransform) +} diff --git a/pkg/graph/constructor_test.go b/pkg/graph/constructor_test.go new file mode 100644 index 00000000000..48f36eb8e92 --- /dev/null +++ b/pkg/graph/constructor_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graph + +import ( + "github.com/stretchr/testify/assert" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +var ( + sampleUri, _ = apis.ParseURL("https://knative.dev") +) + +func TestAddBroker(t *testing.T) { + brokerWithEdge := &Vertex{ + self: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: "my-broker", + Namespace: "default", + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + }, + } + destinationWithEdge := &Vertex{ + self: &duckv1.Destination{ + URI: sampleUri, + }, + } + brokerWithEdge.AddEdge(destinationWithEdge, &duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: "my-broker", + Namespace: "default", + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + }, NoTransform) + tests := []struct { + name string + broker eventingv1.Broker + expected map[comparableDestination]*Vertex + }{ + { + name: "no DLS", + broker: eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "default", + }, + }, + expected: map[comparableDestination]*Vertex{ + { + Ref: duckv1.KReference{ + Name: "my-broker", + Namespace: "default", + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + }: { + self: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: "my-broker", + Namespace: "default", + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + }, + }, + }, + }, { + name: "DLS", + broker: eventingv1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "default", + }, + Spec: eventingv1.BrokerSpec{ + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: sampleUri, + }, + }, + }, + }, + expected: map[comparableDestination]*Vertex{ + { + Ref: duckv1.KReference{ + Name: "my-broker", + Namespace: "default", + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + }, + }: brokerWithEdge, + { + URI: *sampleUri, + }: destinationWithEdge, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + g := NewGraph() + g.AddBroker(test.broker) + assert.Len(t, g.vertices, len(test.expected)) + for k, expected := range test.expected { + actual, ok := g.vertices[k] + assert.True(t, ok) + // assert.Equal can't do equality on function values, which edges have, so we need to do a more complicated check + assert.Equal(t, actual.self, expected.self) + assert.Len(t, actual.inEdges, len(expected.inEdges)) + assert.Subset(t, makeComparableEdges(actual.inEdges), makeComparableEdges(expected.inEdges)) + assert.Len(t, actual.outEdges, len(expected.outEdges)) + assert.Subset(t, makeComparableEdges(actual.outEdges), makeComparableEdges(expected.outEdges)) + } + }) + } +} + +func makeComparableEdges(edges []*Edge) []comparableEdge { + res := make([]comparableEdge, len(edges)) + + for _, e := range edges { + res = append(res, comparableEdge{ + self: makeComparableDestination(e.self), + to: makeComparableDestination(e.to.self), + from: makeComparableDestination(e.from.self), + }) + } + + return res +} + +type comparableEdge struct { + self comparableDestination + to comparableDestination + from comparableDestination +} diff --git a/pkg/graph/types.go b/pkg/graph/types.go index 8e284da822b..d7dd93df039 100644 --- a/pkg/graph/types.go +++ b/pkg/graph/types.go @@ -18,11 +18,12 @@ package graph import ( eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) type Graph struct { - vertices []*Vertex + vertices map[comparableDestination]*Vertex } type Vertex struct { @@ -39,6 +40,17 @@ type Edge struct { to *Vertex } +// comparableDestination is a modified version of duckv1.Destination that is comparable (no pointers). +// It also omits some fields not needed for event lineage. +type comparableDestination struct { + // Ref points to an Addressable. + // +optional + Ref duckv1.KReference `json:"ref,omitempty"` + + // URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. + // +optional + URI apis.URL `json:"uri,omitempty"` +} type TransformFunction func(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext) // TODO(cali0707): flesh this out more, know we need it, not sure what needs to be in it yet @@ -46,8 +58,18 @@ type TransformFunctionContext struct{} func (t TransformFunctionContext) DeepCopy() TransformFunctionContext { return t } // TODO(cali0707) implement this once we have fleshed out the transform function context struct +func NewGraph() *Graph { + return &Graph{ + vertices: make(map[comparableDestination]*Vertex), + } +} + func (g *Graph) Vertices() []*Vertex { - return g.vertices + vertices := make([]*Vertex, len(g.vertices)) + for _, v := range g.vertices { + vertices = append(vertices, v) + } + return vertices } func (g *Graph) UnvisitAll() { @@ -95,7 +117,9 @@ func (v *Vertex) NewWithSameRef() *Vertex { } func (v *Vertex) AddEdge(to *Vertex, edgeRef *duckv1.Destination, transform TransformFunction) { - v.outEdges = append(v.outEdges, &Edge{from: v, to: to, transform: transform, self: edgeRef}) + edge := &Edge{from: v, to: to, transform: transform, self: edgeRef} + v.outEdges = append(v.outEdges, edge) + to.inEdges = append(to.inEdges, edge) } @@ -122,3 +146,14 @@ func (e *Edge) Reference() *duckv1.Destination { func NoTransform(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext) { return et, tfc } + +func makeComparableDestination(dest *duckv1.Destination) comparableDestination { + res := comparableDestination{} + if dest.Ref != nil { + res.Ref = *dest.Ref + } + if dest.URI != nil { + res.URI = *dest.URI + } + return res +} From 4317b0d92555a45f6e661c73ee758094d640c081 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 5 Mar 2024 12:38:49 -0500 Subject: [PATCH 2/2] cleanup: fix imports Signed-off-by: Calum Murray --- pkg/graph/constructor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/graph/constructor_test.go b/pkg/graph/constructor_test.go index 48f36eb8e92..a0d799b0c25 100644 --- a/pkg/graph/constructor_test.go +++ b/pkg/graph/constructor_test.go @@ -17,9 +17,9 @@ limitations under the License. package graph import ( - "github.com/stretchr/testify/assert" "testing" + "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"