Skip to content

Commit

Permalink
made some edits to be able to run tests locally
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Apr 24, 2024
1 parent dc8f97a commit aa5b651
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 11 deletions.
4 changes: 3 additions & 1 deletion config.go
Expand Up @@ -233,6 +233,8 @@ var defaultConfig = Config{
ApplicationName: applicationName,
},
Sender: sink.Config{
Linger: 180,
Linger: 180,
CutOffPeriod: 10,
NumWorkersPerSender: 5000,
},
}
8 changes: 5 additions & 3 deletions internal/client/client.go
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
package client

import "net/http"
import (
"net/http"
)

// Client is the interface used to requests messages to the desired location.
// The Client can be either and HTTP Client or a Kafka Producer.
Expand All @@ -15,8 +17,8 @@ func NopClient(next Client) Client {
}

// DoerFunc implements Client
type doerFunc func(*http.Request) (*http.Response, error)
type DoerFunc func(*http.Request) (*http.Response, error)

func (d doerFunc) Do(req *http.Request) (*http.Response, error) {
func (d DoerFunc) Do(req *http.Request) (*http.Response, error) {
return d(req)
}
2 changes: 1 addition & 1 deletion internal/client/httpClient.go
Expand Up @@ -38,7 +38,7 @@ func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec,
}

func (m *metricWrapper) RoundTripper(next Client) Client {
return doerFunc(func(req *http.Request) (*http.Response, error) {
return DoerFunc(func(req *http.Request) (*http.Response, error) {
startTime := m.now()
resp, err := next.Do(req)
endTime := m.now()
Expand Down
1 change: 1 addition & 0 deletions internal/sink/sink.go
Expand Up @@ -44,6 +44,7 @@ func NewWebhookV1(s *sender) {
id: s.id,
deliveryInterval: s.deliveryInterval,
deliveryRetries: s.deliveryRetries,
logger: s.logger,
}
s.sink = v1
}
Expand Down
3 changes: 3 additions & 0 deletions internal/sink/sinkSender.go
Expand Up @@ -119,6 +119,7 @@ func NewSender(w *wrapper, l Listener) (s *sender, err error) {
listener: l,
queueSize: w.config.QueueSizePerSender,
deliverUntil: l.GetUntil(),
logger: w.logger,
// dropUntil: where is this being set in old caduceus?,
cutOffPeriod: w.config.CutOffPeriod,
deliveryRetries: w.config.DeliveryRetries,
Expand Down Expand Up @@ -159,11 +160,13 @@ func (s *sender) Update(l Listener) (err error) {
switch v := l.(type) {
case *ListenerV1:
m := &MatcherV1{}
m.logger = s.logger
if err = m.Update(*v); err != nil {
return
}
s.matcher = m
NewWebhookV1(s)

default:
err = fmt.Errorf("invalid listner")
}
Expand Down
15 changes: 9 additions & 6 deletions internal/sink/sinkWrapper.go
Expand Up @@ -34,7 +34,7 @@ type WrapperIn struct {

// SinkWrapper interface is needed for unit testing.
type Wrapper interface {
// Update([]ancla.InternalWebhook)
Update([]Listener)
Queue(*wrp.Message)
Shutdown(bool)
}
Expand Down Expand Up @@ -98,6 +98,12 @@ func NewWrapper(in WrapperIn) (wr Wrapper, err error) {
metrics: in.Metrics,
}

tr := newRoundTripper(in.Config, in.Tracing)
w.client = client.DoerFunc((&http.Client{
Transport: tr,
Timeout: in.Config.ClientTimeout,
}).Do)

if in.Config.Linger <= 0 {
linger := fmt.Sprintf("linger not positive: %v", in.Config.Linger)
err = errors.New(linger)
Expand All @@ -115,7 +121,7 @@ func NewWrapper(in WrapperIn) (wr Wrapper, err error) {
}

// no longer being initialized at start up - needs to be initialized by the creation of the outbound sender
func NewRoundTripper(config Config, tracing candlelight.Tracing) (tr http.RoundTripper) {
func newRoundTripper(config Config, tracing candlelight.Tracing) (tr http.RoundTripper) {
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation},
MaxIdleConnsPerHost: config.NumWorkersPerSender,
Expand Down Expand Up @@ -165,13 +171,10 @@ func (w *wrapper) Update(list []Listener) {
ss, err = NewSender(w, listener)
w.clientMiddleware = metricWrapper.RoundTripper

// {
// ss, err = newSinkSender(sw, r1)
// }

if err == nil {
w.senders[inValue.ID] = ss
}

continue
}
fmt.Println(sender)
Expand Down

0 comments on commit aa5b651

Please sign in to comment.