Skip to content

Commit

Permalink
Merge pull request #13 from Comcast/feature/support_basic_auth
Browse files Browse the repository at this point in the history
Feature/support basic auth
  • Loading branch information
schmidtw committed Aug 10, 2017
2 parents 9dd1a48 + 0078863 commit b5a8f10
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
48 changes: 32 additions & 16 deletions src/talaria/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
Expand All @@ -28,12 +29,13 @@ type Dispatcher interface {

// dispatcher is the internal Dispatcher implementation
type dispatcher struct {
logger logging.Logger
urlFilter URLFilter
method string
timeout time.Duration
eventEndpoints map[string][]string
outbounds chan<- *outboundEnvelope
logger logging.Logger
urlFilter URLFilter
method string
timeout time.Duration
authorizationKeys []string
eventEndpoints map[string][]string
outbounds chan<- *outboundEnvelope
}

// NewDispatcher constructs a Dispatcher which sends envelopes via the returned channel.
Expand All @@ -49,12 +51,13 @@ func NewDispatcher(o *Outbounder, urlFilter URLFilter) (Dispatcher, <-chan *outb

outbounds := make(chan *outboundEnvelope, o.outboundQueueSize())
return &dispatcher{
logger: o.logger(),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
eventEndpoints: o.eventEndpoints(),
outbounds: outbounds,
logger: o.logger(),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
authorizationKeys: o.authKey(),
eventEndpoints: o.eventEndpoints(),
outbounds: outbounds,
}, outbounds, nil
}

Expand All @@ -77,6 +80,21 @@ func (d *dispatcher) send(request *http.Request) error {
}
}

// newRequest creates a basic HTTP request appropriate for this dispatcher
func (d *dispatcher) newRequest(url, contentType string, body io.Reader) (*http.Request, error) {
request, err := http.NewRequest(d.method, url, body)
if err == nil {
request.Header.Set("Content-Type", contentType)

// TODO: Need to work out how to handle authorization better, without basic auth
if len(d.authorizationKeys) > 0 {
request.Header.Set("Authorization", "Basic "+d.authorizationKeys[0])
}
}

return request, err
}

func (d *dispatcher) dispatchEvent(eventType, contentType string, contents []byte) error {
endpoints := d.eventEndpoints[eventType]
if len(endpoints) == 0 {
Expand All @@ -90,12 +108,11 @@ func (d *dispatcher) dispatchEvent(eventType, contentType string, contents []byt
}

for _, url := range endpoints {
request, err := http.NewRequest(d.method, url, bytes.NewReader(contents))
request, err := d.newRequest(url, contentType, bytes.NewReader(contents))
if err != nil {
return err
}

request.Header.Set("Content-Type", contentType)
if err := d.send(request); err != nil {
return err
}
Expand All @@ -110,12 +127,11 @@ func (d *dispatcher) dispatchTo(unfiltered string, contentType string, contents
return err
}

request, err := http.NewRequest(d.method, url, bytes.NewReader(contents))
request, err := d.newRequest(url, contentType, bytes.NewReader(contents))
if err != nil {
return err
}

request.Header.Set("Content-Type", contentType)
return d.send(request)
}

Expand Down
9 changes: 9 additions & 0 deletions src/talaria/outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Outbounder struct {
MaxIdleConns int `json:"maxIdleConns"`
MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"`
IdleConnTimeout time.Duration `json:"idleConnTimeout"`
AuthKey []string `json:"authKey"`
Logger logging.Logger `json:"-"`
}

Expand Down Expand Up @@ -171,6 +172,14 @@ func (o *Outbounder) maxIdleConnsPerHost() int {
return DefaultMaxIdleConnsPerHost
}

func (o *Outbounder) authKey() []string {
if o != nil {
return o.AuthKey
}

return nil
}

func (o *Outbounder) idleConnTimeout() time.Duration {
if o != nil && o.IdleConnTimeout > 0 {
return o.IdleConnTimeout
Expand Down

0 comments on commit b5a8f10

Please sign in to comment.