Skip to content

mtlpy/kinesis_producer

Repository files navigation

Kinesis Producer

Python producer for AWS Kinesis Stream with record aggregation.

It uses Boto 3 and is tested on Python 2.7 and 3.4/3.5.

Install

pip install 'kinesis_producer < 1'

Note: Kinesis Producer use semver: you should always freeze on the major version since it could mean breaking the API.

Usage

Send records aggregated up to 100KB, 200ms and joined with '\n':

from kinesis_producer import KinesisProducer

config = dict(
    aws_region='us-east-1',
    buffer_size_limit=100000,
    buffer_time_limit=0.2,
    kinesis_concurrency=1,
    kinesis_max_retries=10,
    record_delimiter='\n',
    stream_name='KINESIS_STREAM_NAME',
    )

k = KinesisProducer(config=config)

for record in records:
    k.send(record)

k.close()
k.join()

Config

aws_region

AWS region for Kinesis calls (like us-east-1)

buffer_size_limit

Approximative size limit for record aggregation (in bytes)

buffer_time_limit

Approximative time limit for record aggregation (in seconds)

kinesis_concurrency

Set the concurrency level for Kinesis calls. Set to 1 for no concurrency. Set to 2 and more to use a thread pool.

kinesis_max_retries

Number of Kinesis put_records call attempt before giving up. This number should be between 4 and 10 if you want to handle temporary ProvisionedThroughputExceeded errors.

record_delimiter

Delimiter for record aggregation

stream_name

Name of the Kinesis Stream

Kinesis retries

Kinesis calls are retried for ProvisionedThroughputExceeded error only. Retry use an exponential backoff logic (0.1s, 0.2s, 0.4s, 0.8s, 1.60s, 3.20s, 6.40s, 12.80s, 25.60s, 51.20s, 102.40s...)

Released under the MIT license.

About

Python producer for AWS Kinesis Stream with record aggregation (Work In Progress)

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages