Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-gcp-pubsub-lib
Browse files Browse the repository at this point in the history
  • Loading branch information
beiwei30 committed Nov 22, 2021
2 parents 6527d59 + 915180b commit 5134b0b
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 379 deletions.
6 changes: 3 additions & 3 deletions bindings/aws/kinesis/kinesis.go
Expand Up @@ -143,7 +143,7 @@ func (a *AWSKinesis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeRespon

func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
if a.metadata.KinesisConsumerMode == SharedThroughput {
a.worker = worker.NewWorker(a.recordProcessorFactory(handler), a.workerConfig, nil)
a.worker = worker.NewWorker(a.recordProcessorFactory(handler), a.workerConfig)
err := a.worker.Start()
if err != nil {
return err
Expand Down Expand Up @@ -346,8 +346,8 @@ func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput)
}

// checkpoint it after processing this batch
lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber
input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
}

func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) {
Expand Down
29 changes: 21 additions & 8 deletions bindings/mqtt/mqtt.go
Expand Up @@ -186,14 +186,27 @@ func (m *MQTT) Operations() []bindings.OperationKind {
}

func (m *MQTT) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
m.logger.Debugf("mqtt publishing topic %s with data: %v", m.metadata.topic, req.Data)

token := m.producer.Publish(m.metadata.topic, m.metadata.qos, m.metadata.retain, req.Data)
if !token.WaitTimeout(defaultWait) || token.Error() != nil {
return nil, fmt.Errorf("mqtt error from publish: %v", token.Error())
}

return nil, nil
// MQTT client Publish() has an internal race condition in the default autoreconnect config.
// To mitigate sporadic failures on the Dapr side, this implementation retries 3 times at
// a fixed 200ms interval. This is not configurable to keep this as an implementation detail
// for this component, as the additional public config metadata required could be replaced
// by the more general Dapr APIs for resiliency moving forwards.
cbo := backoff.NewConstantBackOff(200 * time.Millisecond)
bo := backoff.WithMaxRetries(cbo, 3)
bo = backoff.WithContext(bo, m.ctx)

return nil, retry.NotifyRecover(func() error {
m.logger.Debugf("mqtt publishing topic %s with data: %v", m.metadata.topic, req.Data)
token := m.producer.Publish(m.metadata.topic, m.metadata.qos, m.metadata.retain, req.Data)
if !token.WaitTimeout(defaultWait) || token.Error() != nil {
return fmt.Errorf("mqtt error from publish: %v", token.Error())
}
return nil
}, bo, func(err error, _ time.Duration) {
m.logger.Debugf("Could not publish MQTT message. Retrying...: %v", err)
}, func() {
m.logger.Debug("Successfully published MQTT message after it previously failed")
})
}

func (m *MQTT) handleMessage(handler func(*bindings.ReadResponse) ([]byte, error), mqttMsg mqtt.Message) error {
Expand Down
11 changes: 4 additions & 7 deletions go.mod
Expand Up @@ -36,7 +36,7 @@ require (
github.com/apache/rocketmq-client-go/v2 v2.1.0
github.com/apache/thrift v0.14.0 // indirect
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/aws/aws-sdk-go v1.36.30
github.com/aws/aws-sdk-go v1.41.7
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/buger/jsonparser v1.1.1 // indirect
Expand Down Expand Up @@ -67,7 +67,7 @@ require (
github.com/gocql/gocql v0.0.0-20210515062232-b7ef815b4556
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/golang/mock v1.6.0
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/mux v1.8.0
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c
Expand Down Expand Up @@ -117,16 +117,13 @@ require (
github.com/tidwall/gjson v1.8.0 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/valyala/fasthttp v1.21.0
github.com/vmware/vmware-go-kcl v0.0.0-20191104173950-b6c74c3fe74e
github.com/vmware/vmware-go-kcl v1.5.0
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
github.com/yudai/gojsondiff v1.0.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e // indirect
go.mongodb.org/mongo-driver v1.5.1
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.18.1 // indirect
goji.io v2.0.2+incompatible // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
Expand Down Expand Up @@ -156,4 +153,4 @@ require (

replace k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36

replace github.com/dapr/components-contrib => ../components-contrib
replace github.com/dapr/components-contrib => ../components-contrib

0 comments on commit 5134b0b

Please sign in to comment.