Skip to content

Commit

Permalink
Merge pull request #235 from denopink/denopink/feature/qos-ack-implem…
Browse files Browse the repository at this point in the history
…entation

I realized I was using my fork and the not clone. swapped out `main` with a new branch. will create a new pr
  • Loading branch information
denopink committed Jul 20, 2022
2 parents 38c8d36 + 4dd66f6 commit 51a3d9c
Show file tree
Hide file tree
Showing 8 changed files with 979 additions and 132 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ coverage.txt
#Goland
.idea/

# VSCode
*.code-workspace
.vscode/*

# helm
deploy/k8s/talaria/rendered.*

Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- https://nvd.nist.gov/vuln/detail/CVE-2021-31525
- https://nvd.nist.gov/vuln/detail/CVE-2021-44716

## [v0.6.6-dev.1]
- QOS implementation [#228](https://github.com/xmidt-org/talaria/pull/228) [#235](https://github.com/xmidt-org/talaria/pull/235)

## [v0.6.5]
- Bumped webpa-common to v2.0.6, fixing panic on send endpoint. [#229](https://github.com/xmidt-org/talaria/pull/229)

Expand Down Expand Up @@ -146,7 +149,8 @@ Switching to new build process
## [v0.1.1] Tue Mar 28 2017 Weston Schmidt - 0.1.1
- initial creation

[Unreleased]: https://github.com/xmidt-org/talaria/compare/v0.6.5...HEAD
[Unreleased]: https://github.com/xmidt-org/talaria/compare/v0.6.6-dev.1...HEAD
[v0.6.6-dev.1]: https://github.com/xmidt-org/talaria/compare/v0.6.5...v0.6.6-dev.1
[v0.6.5]: https://github.com/xmidt-org/talaria/compare/v0.6.4...v0.6.5
[v0.6.4]: https://github.com/xmidt-org/talaria/compare/v0.6.3...v0.6.4
[v0.6.3]: https://github.com/xmidt-org/talaria/compare/v0.6.2...v0.6.3
Expand Down
231 changes: 168 additions & 63 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,7 @@ import (
"github.com/xmidt-org/wrp-go/v3"
)

var ErrOutboundQueueFull = errors.New("Outbound message queue full")

// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request
type outboundEnvelope struct {
request *http.Request
cancel func()
}

// eventTypeContextKey is the internal key type for storing the event type
type eventTypeContextKey struct{}
var ErrOutboundQueueFull = errors.New("outbound message queue full")

// Dispatcher handles the creation and routing of HTTP requests in response to device events.
// A Dispatcher represents the send side for enqueuing HTTP requests.
Expand All @@ -54,18 +45,31 @@ type Dispatcher interface {
OnDeviceEvent(*device.Event)
}

// dispatcher is the internal Dispatcher implementation
// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request.
type outboundEnvelope struct {
request *http.Request
cancel func()
}

// eventTypeContextKey is the internal key type for storing the event type
type eventTypeContextKey struct{}

// dispatcher is the internal Dispatcher implementation.
type dispatcher struct {
errorLog log.Logger
urlFilter URLFilter
method string
timeout time.Duration
authorizationKey string
source string
eventMap event.MultiMap
queueSize metrics.Gauge
droppedMessages metrics.Counter
outbounds chan<- outboundEnvelope
errorLog log.Logger
urlFilter URLFilter
method string
timeout time.Duration
authorizationKey string
source string
eventMap event.MultiMap
queueSize metrics.Gauge
droppedMessages metrics.Counter
qosAckSuccess metrics.Counter
qosAckFailure metrics.Counter
qosAckSuccessLatency metrics.Histogram
qosAckFailureLatency metrics.Histogram
outbounds chan<- outboundEnvelope
}

// NewDispatcher constructs a Dispatcher which sends envelopes via the returned channel.
Expand All @@ -89,19 +93,65 @@ func NewDispatcher(om OutboundMeasures, o *Outbounder, urlFilter URLFilter) (Dis
logger.Log(level.Key(), level.InfoValue(), "eventMap", eventMap)

return &dispatcher{
errorLog: logging.Error(logger),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
authorizationKey: o.authKey(),
eventMap: eventMap,
queueSize: om.QueueSize,
source: o.source(),
droppedMessages: om.DroppedMessages,
outbounds: outbounds,
errorLog: logging.Error(logger),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
authorizationKey: o.authKey(),
eventMap: eventMap,
queueSize: om.QueueSize,
source: o.source(),
droppedMessages: om.DroppedMessages,
qosAckSuccess: om.QOSAckSuccess,
qosAckFailure: om.QOSAckFailure,
qosAckSuccessLatency: om.QOSAckSuccessLatency,
qosAckFailureLatency: om.QOSAckFailureLatency,
outbounds: outbounds,
}, outbounds, nil
}

// OnDeviceEvent is the device.Listener function that processes outbound events.
func (d *dispatcher) OnDeviceEvent(event *device.Event) {
if event == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil event")
return
}

d.qosAck(event)
switch event.Type {
case device.Connect:
eventType, message := newOnlineMessage(d.source, event.Device)
if err := d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching online event", "eventType", eventType, "destination", message.Destination, logging.ErrorKey(), err)
}

case device.Disconnect:
eventType, message := newOfflineMessage(d.source, event.Device)
if err := d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching offline event", "eventType", eventType, "destination", message.Destination, logging.ErrorKey(), err)
}

case device.MessageReceived:
if routable, ok := event.Message.(wrp.Routable); ok {
destination := routable.To()
contentType := event.Format.ContentType()
if strings.HasPrefix(destination, EventPrefix) {
eventType := destination[len(EventPrefix):]
if err := d.dispatchEvent(eventType, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching event", "eventType", eventType, "destination", destination, logging.ErrorKey(), err)
}
} else if strings.HasPrefix(destination, DNSPrefix) {
unfilteredURL := destination[len(DNSPrefix):]
if err := d.dispatchTo(unfilteredURL, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching to endpoint", "destination", destination, logging.ErrorKey(), err)
}
} else {
d.errorLog.Log(logging.MessageKey(), "Unroutable destination", "destination", destination)
}
}
}
}

// send wraps the given request in an outboundEnvelope together with a cancellable context,
// then asynchronously sends that request to the outbounds channel. This method will
// block on the outbound channel only as long as the context is not cancelled, i.e. does not time out.
Expand All @@ -123,7 +173,7 @@ func (d *dispatcher) send(parent context.Context, request *http.Request) error {
}
}

// newRequest creates a basic HTTP request appropriate for this dispatcher
// newRequest creates a basic HTTP request appropriate for this dispatcher.
func (d *dispatcher) newRequest(url, contentType string, body io.Reader) (*http.Request, error) {
request, err := http.NewRequest(d.method, url, body)
if err == nil {
Expand All @@ -143,7 +193,7 @@ func (d *dispatcher) dispatchEvent(eventType, contentType string, contents []byt
if !ok {
// allow no endpoints, but log an error since this means that we're dropping
// traffic explicitly because of configuration
return fmt.Errorf("No endpoints configured for event: %s", eventType)
return fmt.Errorf("no endpoints configured for event: %s", eventType)
}

ctx := context.WithValue(
Expand Down Expand Up @@ -199,40 +249,95 @@ func (d *dispatcher) dispatchTo(unfiltered string, contentType string, contents
)
}

func (d *dispatcher) OnDeviceEvent(event *device.Event) {
switch event.Type {
case device.Connect:
eventType, message := newOnlineMessage(d.source, event.Device)
if err := d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching online event", "eventType", eventType, "destination", message.Destination, logging.ErrorKey(), err)
}
func (d *dispatcher) recordQOSAckLatency(s time.Time, f bool, l ...string) {
switch {
case f:
d.qosAckFailureLatency.With(l...).Observe(time.Since(s).Seconds())
default:
d.qosAckSuccessLatency.With(l...).Observe(time.Since(s).Seconds())
}
}

case device.Disconnect:
eventType, message := newOfflineMessage(d.source, event.Device)
if err := d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching offline event", "eventType", eventType, "destination", message.Destination, logging.ErrorKey(), err)
}
// qosAck takes events determines whether or not a QOS ack to the source device is required.
// Response ack message SHALL be a msg_type=4, SimpleEventMessageType.
func (d *dispatcher) qosAck(event *device.Event) {
var r *device.Request

if event == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil event")
return
} else if event.Device == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil device")
return
}

m, ok := event.Message.(*wrp.Message)
if !ok {
return
} else if len(m.PartnerIDs) != 1 {
// will remove this comment once wrp validation is integrated
d.errorLog.Log(logging.MessageKey(), "Error invalid number of partnerIDs", "partnerIDLen", len(m.PartnerIDs))
return
} else if !m.IsQOSAckPart() {
return
}

switch event.Type {
// Atm, only supporting QOS acks for received device messages
case device.MessageReceived:
if routable, ok := event.Message.(wrp.Routable); ok {
var (
destination = routable.To()
contentType = event.Format.ContentType()
)

if strings.HasPrefix(destination, EventPrefix) {
eventType := destination[len(EventPrefix):]
if err := d.dispatchEvent(eventType, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching event", "eventType", eventType, "destination", destination, logging.ErrorKey(), err)
}
} else if strings.HasPrefix(destination, DNSPrefix) {
unfilteredURL := destination[len(DNSPrefix):]
if err := d.dispatchTo(unfilteredURL, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching to endpoint", "destination", destination, logging.ErrorKey(), err)
}
} else {
d.errorLog.Log(logging.MessageKey(), "Unroutable destination", "destination", destination)
}
// https://xmidt.io/docs/wrp/simple-messages/#qos-details
r = &device.Request{
Message: &wrp.Message{
// When a qos field is specified that requires an ack, the response ack message SHALL be a msg_type=4.
Type: wrp.SimpleEventMessageType,
// The `source` SHALL be the component that cannot process the event further.
Source: "",
// The `dest` SHALL be the original requesting `source` address.
Destination: m.Source,
// The `content_type` and `payload` SHALL be omitted & set to empty, or may set to `application/text` and text to help describe the result. **DO NOT** process this text beyond for logging/debugging.
ContentType: "",
Payload: []byte{},
// The `partner_ids` SHALL be the same as the original message.
PartnerIDs: m.PartnerIDs,
// The `headers` SHOULD generally be the same as the original message, except where updating their values is correct.
Headers: m.Headers,
// The `metadata` map SHALL be populated with the original data or set to empty.
Metadata: m.Metadata,
// The `session_id` MAY be added by the cloud.
SessionID: m.SessionID,
// The `qos` SHALL be the same as the original message.
QualityOfService: m.QualityOfService,
// The `transaction_uuid` SHALL be the same as the original message.
TransactionUUID: m.TransactionUUID,
// The `rdr` SHALL be present and represent the outcome of the handling of the message.
RequestDeliveryResponse: m.RequestDeliveryResponse,
},
Format: event.Format,
}
default:
return
}

l := m.QualityOfService.Level()
p := m.PartnerIDs[0]
t := m.Type.FriendlyName()
ls := []string{qosLevelLabel, l.String(), partnerIDLabel, p, messageType, t}
ctx, cancel := context.WithTimeout(context.Background(), d.timeout)
defer cancel()

// Observe the latency of sending a qos ack to the source device
ackFailure := false
defer func(s time.Time) {
d.recordQOSAckLatency(s, ackFailure, ls...)
}(time.Now())

if _, err := event.Device.Send(r.WithContext(ctx)); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching QOS ack", "qosLevel", l, "partnerID", p, "messageType", t, logging.ErrorKey(), err)
d.qosAckFailure.With(ls...).Add(1)
ackFailure = true
return
}

d.qosAckSuccess.With(ls...).Add(1)
}

0 comments on commit 51a3d9c

Please sign in to comment.