Skip to content

scblur869/mqtt-to-kinesis

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mqtt-to-kinesis

The idea behind this service is to run it as a container on the edge, subscribe to a topic from MQTT and then batch those payloads up and send them to kinesis data stream --> firehose --> datalake

I have tested this with various free mqtt brokers and it seems to work pretty well.

this service written in Go is deployed via a container using parameterization. see the Dockerfile.example

it can also be ran from command line using an .env file below is an example of what that file could look like

 PORT=1883
 WEBSOCKET=8080
 BROKER=public.mqtthq.com
 USER=
 PASS=
 CLIENTID=client-123
 TOPIC=homeassistant/sensor/#
 QOS=0
 BACKLOG_COUNT=2000
 KIN_STREAM_NAME=kinesis-data-stream
 KIN_MAXCONN=10
 KIN_SESSION_NAME=mysession-1234
 KIN_PARTITION_KEY=sensor-data
 AWS_ACCESS_KEY_ID=nnnnnnnnnnnnnnnnnnnnn
 AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
 AWS_ASSUME_ROLE_ARN=arn:aws:iam::1111111111111:role/service-role/kinesis-endpoint-access
 AWS_REGION=us-east-1
 STDOUT=true

typical STDOUT looks like

key: homeassistant/sensor/eisbar_temperatura_batteria_motore/state
value: 21.1
key: homeassistant/sensor/eisbar_uptime_human_readable/state
value: 42s
key: homeassistant/sensor/eisbar_uptime_sensor/state
value: 42
key: homeassistant/sensor/eisbar_uptime_human_readable/state
value: 42s
key: homeassistant/sensor/eisbar_tensione_batteria_servizi/state
value: 11.98
key: homeassistant/sensor/eisbar_wifi/state
value: -48
key: homeassistant/sensor/eisbar_uptime_human_readable/state
value: 44s
key: homeassistant/sensor/eisbar_uptime_sensor/state
value: 44
key: homeassistant/sensor/eisbar_tensione_pannelli_solari/state
value: 1.50
key: homeassistant/sensor/eisbar_tensione_batteria_motore/state
value: 1.50
key: homeassistant/sensor/eisbar_temperatura/state
value: 21.4
key: homeassistant/sensor/eisbar_umidit/state
value: 68.9
key: homeassistant/sensor/eisbar_uptime_human_readable/state
value: 46s
key: homeassistant/sensor/eisbar_uptime_sensor/state
value: 46
2023/05/11 07:46:19 flushing records reason=interval,  records=1
2023/05/11 07:46:19 Result[0] ShardId=shardId-000000000001,  SequenceNumber=49639767612638592089930656974715882307650445660919955474

Kinesis Data Stream view Kinesis Data Steam Data Viewer

And you can use AWS Kinesis Firehose to send this to S3 and transform the format if you like S3

About

subscribes to a MQTT Topic and routes that data to kinesis in 5mb batches

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages