Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

krux/tcp-stream-kafka-producer

TCP Stream Kafka Producer

The Krux TCP Kafka Producer does a very simple job: it takes a configurable list of TCP port/Kafka topic tuples on the command-line, and for each pair, it opens the TCP port, splits the incoming stream on newlines, and places the resultant chunks onto the configured Kafka topic(s). By default, if/when the configured Kafka cluster becomes unavailable, the Stream Listener will automatically close its TCP port(s), and will reopen them again when the cluster becomes available. This behavior can easily be overridden. Like all of Krux' open-sourced Java libraries, it's built atop the Krux Standard Java Library, and produces a bevy of useful usage statistics via StatsD.

Use

The TCP Kafka Producer is a stand-alone jar intended to be executed from the command line. TCP port -> topic mappings are specified via one or more --port.topic command-line options, and nearly all of the configurable Kafka producer settings are also accessible via command-line configuration.

Optionally, an embedded web server can be started at application bootstrap to provide details about the application's performance and health via HTTP. To use the HTTP status check endpoint, pass an --http-port parameter on the command line. Once the process is running, you can get detailed per-topic message processing rates by requesting "/__status" from the configured port. For example, if you pass --http-port 9080, statistics would be available via...

curl localhost:9080/__status | jq .

Also optionally, a --heartbeat-topic option may be passed. This topic name will be used for "heart beat" checks of the Kafka cluster. When that topic cannot be written to, all open TCP listening ports will be closed until the Kafka cluster is available again (allowing upstream handlers to route around this listener).

Example command-line:

java -jar krux-tcp-stream-kafka-producer-1.3.0-full.jar --http-port 9080 --port.topic 1543:topic1,topic2,topic3 --port.topic 32344:topic4 --stats --env prod --http-port 8082 --heartbeat-topic TEST_CONN

Documentation for all command-line configuration options are available by passing -h or --help to the jar

~/$  java -jar krux-tcp-stream-kafka-producer-1.3.0-full.jar --help

Krux Kafka Stream Listener
**************************
Will pass incoming eol-delimited messages on TCP streams to mapped Kafka topics.

Option                                       Description                                              
------                                       -----------                                              
--app-name                                   Application identifier, used for statsd namespaces, log  
                                               file names, etc. If not supplied, will use this app's  
                                               entry point class name. (default:                       
                                               TCPStreamListenerServer)                               
--base-dir                                   Base directory for app needs. (default: /tmp)            
--batch.num.messages [Integer]               The number of messages to send in one batch when using   
                                               async mode. The producer will wait until either this   
                                               number of messages are ready to send or queue.buffer.  
                                               max.ms is reached. (default: 200)                      
--client.id                                  The client id is a user-specified string sent in each    
                                               request to help trace calls. It should logically       
                                               identify the application making the request. (default: 
                                               )                                                      
--compression.codec                          This parameter allows you to specify the compression     
                                               codec for all data generated by this producer. Valid   
                                               values are "none", "gzip" and "snappy". (default: none)
--env                                        Operating environment (default: dev)                     
-h, --help                                   Prints this helpful message                              
--heap-stats-interval-ms [Integer]           Interval (ms) for used heap statsd gauge (default: 1000) 
--heartbeat-topic                            The name of a topic to be used for general connection    
                                               checking, kafka aliveness, etc. (default: )            
--http-port [Integer]                        Accept http connections on this port (0 = web server     
                                               will not start) (default: 0)                           
--log-level                                  Default log4j log level. Valid values: DEBUG, INFO,      
                                               WARN, ERROR, FATAL (default: WARN)                     
--message.send.max.retries [Integer]         This property will cause the producer to automatically   
                                               retry a failed send request. This property specifies   
                                               the number of retries when such failures occur. Note   
                                               that setting a non-zero value here can lead to         
                                               duplicates in the case of network errors that cause a  
                                               message to be sent but the acknowledgement to be lost. 
                                               (default: 3)                                           
--metadata.broker.list                       This is for bootstrapping and the producer will only use 
                                               it for getting metadata (topics, partitions and        
                                               replicas). The socket connections for sending the      
                                               actual data will be established based on the broker    
                                               information returned in the metadata. The format is    
                                               host1:port1,host2:port2, and the list can be a subset  
                                               of brokers or a VIP pointing to a subset of brokers.   
                                               (default: localhost:9092)                              
--port.topic                                 The port->topic mappings (ex: 1234:topic1[,topic2])      
                                               Specify multiple mappings with multiple cl options.    
                                               e.g.: --port.topic 1234:topic1[,topic2] --port.topic   
                                               4567:topic3[,topic4]                                   
--producer.type                              'sync' or 'async' (default: async)                       
--queue.buffering.max.messages [Integer]     The maximum number of unsent messages that can be queued 
                                               up the producer when using async mode before either    
                                               the producer must be blocked or data must be dropped.  
                                               (default: 10000)                                       
--queue.buffering.max.ms [Integer]           Maximum time to buffer data when using async mode. For   
                                               example a setting of 100 will try to batch together    
                                               100ms of messages to send at once. This will improve   
                                               throughput but adds message delivery latency due to    
                                               the buffering. (default: 5000)                         
--queue.enqueue.timeout.ms [Integer]         The amount of time to block before dropping messages     
                                               when running in async mode and the buffer has reached  
                                               queue.buffering.max.messages. If set to 0 events will  
                                               be enqueued immediately or dropped if the queue is     
                                               full (the producer send call will never block). If set 
                                               to -1 the producer will block indefinitely and never   
                                               willingly drop a send. (default: -1)                   
--request.required.acks [Integer]            The type of ack the broker will return to the client.    
                                               0, which means that the producer never waits for an    
                                               acknowledgement                                        
                                               1, which means that the producer gets an               
                                               acknowledgement after the leader replica has received  
                                               the data.                                              
                                               -1, which means that the producer gets an              
                                               acknowledgement after all in-sync replicas have        
                                               received the data.                                     
                                             See https://kafka.apache.org/documentation.              
                                               html#producerconfigs (default: 1)                      
--request.timeout.ms [Integer]               The amount of time the broker will wait trying to meet   
                                               the request.required.acks requirement before sending   
                                               back an error to the client. (default: 10000)          
--retry.backoff.ms [Integer]                 Before each retry, the producer refreshes the metadata   
                                               of relevant topics to see if a new leader has been     
                                               elected. Since leader election takes a bit of time,    
                                               this property specifies the amount of time that the    
                                               producer waits before refreshing the metadata.         
                                               (default: 100)                                         
--send.buffer.bytes [Integer]                Socket write buffer size (default: 102400)               
--stats                                      Enable/disable statsd broadcast                          
--stats-environment                          Stats environment (dictates statsd prefix) (default: dev)
--stats-host                                 Listening statsd host (default: localhost)               
--stats-port [Integer]                       Listening statsd port (default: 8125) 

Releases

Latest stable binary releases can be downloaded from the Releases tab above.

About

Splits newline-delimited TCP streams into Kafka messages

Resources

License

Apache-2.0, Unknown licenses found

Licenses found

Apache-2.0
LICENSE
Unknown
LICENSE.txt

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published