From aa5b6516d0e98db95e1a569c463f9fe28f05ab0e Mon Sep 17 00:00:00 2001 From: maura fortino Date: Wed, 24 Apr 2024 13:21:10 -0400 Subject: [PATCH] made some edits to be able to run tests locally --- config.go | 4 +++- internal/client/client.go | 8 +++++--- internal/client/httpClient.go | 2 +- internal/sink/sink.go | 1 + internal/sink/sinkSender.go | 3 +++ internal/sink/sinkWrapper.go | 15 +++++++++------ 6 files changed, 22 insertions(+), 11 deletions(-) diff --git a/config.go b/config.go index c0c2314..367666c 100644 --- a/config.go +++ b/config.go @@ -233,6 +233,8 @@ var defaultConfig = Config{ ApplicationName: applicationName, }, Sender: sink.Config{ - Linger: 180, + Linger: 180, + CutOffPeriod: 10, + NumWorkersPerSender: 5000, }, } diff --git a/internal/client/client.go b/internal/client/client.go index ac9b03d..5b5e814 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 package client -import "net/http" +import ( + "net/http" +) // Client is the interface used to requests messages to the desired location. // The Client can be either and HTTP Client or a Kafka Producer. @@ -15,8 +17,8 @@ func NopClient(next Client) Client { } // DoerFunc implements Client -type doerFunc func(*http.Request) (*http.Response, error) +type DoerFunc func(*http.Request) (*http.Response, error) -func (d doerFunc) Do(req *http.Request) (*http.Response, error) { +func (d DoerFunc) Do(req *http.Request) (*http.Response, error) { return d(req) } diff --git a/internal/client/httpClient.go b/internal/client/httpClient.go index 9f2d927..fe40388 100644 --- a/internal/client/httpClient.go +++ b/internal/client/httpClient.go @@ -38,7 +38,7 @@ func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, } func (m *metricWrapper) RoundTripper(next Client) Client { - return doerFunc(func(req *http.Request) (*http.Response, error) { + return DoerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) endTime := m.now() diff --git a/internal/sink/sink.go b/internal/sink/sink.go index c77a252..54091ec 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -44,6 +44,7 @@ func NewWebhookV1(s *sender) { id: s.id, deliveryInterval: s.deliveryInterval, deliveryRetries: s.deliveryRetries, + logger: s.logger, } s.sink = v1 } diff --git a/internal/sink/sinkSender.go b/internal/sink/sinkSender.go index 62da978..05e9a88 100644 --- a/internal/sink/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -119,6 +119,7 @@ func NewSender(w *wrapper, l Listener) (s *sender, err error) { listener: l, queueSize: w.config.QueueSizePerSender, deliverUntil: l.GetUntil(), + logger: w.logger, // dropUntil: where is this being set in old caduceus?, cutOffPeriod: w.config.CutOffPeriod, deliveryRetries: w.config.DeliveryRetries, @@ -159,11 +160,13 @@ func (s *sender) Update(l Listener) (err error) { switch v := l.(type) { case *ListenerV1: m := &MatcherV1{} + m.logger = s.logger if err = m.Update(*v); err != nil { return } s.matcher = m NewWebhookV1(s) + default: err = fmt.Errorf("invalid listner") } diff --git a/internal/sink/sinkWrapper.go b/internal/sink/sinkWrapper.go index 131e6b9..2017bdf 100644 --- a/internal/sink/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -34,7 +34,7 @@ type WrapperIn struct { // SinkWrapper interface is needed for unit testing. type Wrapper interface { - // Update([]ancla.InternalWebhook) + Update([]Listener) Queue(*wrp.Message) Shutdown(bool) } @@ -98,6 +98,12 @@ func NewWrapper(in WrapperIn) (wr Wrapper, err error) { metrics: in.Metrics, } + tr := newRoundTripper(in.Config, in.Tracing) + w.client = client.DoerFunc((&http.Client{ + Transport: tr, + Timeout: in.Config.ClientTimeout, + }).Do) + if in.Config.Linger <= 0 { linger := fmt.Sprintf("linger not positive: %v", in.Config.Linger) err = errors.New(linger) @@ -115,7 +121,7 @@ func NewWrapper(in WrapperIn) (wr Wrapper, err error) { } // no longer being initialized at start up - needs to be initialized by the creation of the outbound sender -func NewRoundTripper(config Config, tracing candlelight.Tracing) (tr http.RoundTripper) { +func newRoundTripper(config Config, tracing candlelight.Tracing) (tr http.RoundTripper) { tr = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, MaxIdleConnsPerHost: config.NumWorkersPerSender, @@ -165,13 +171,10 @@ func (w *wrapper) Update(list []Listener) { ss, err = NewSender(w, listener) w.clientMiddleware = metricWrapper.RoundTripper - // { - // ss, err = newSinkSender(sw, r1) - // } - if err == nil { w.senders[inValue.ID] = ss } + continue } fmt.Println(sender)