Skip to content

Consumer

Matt Howlett edited this page Dec 19, 2019 · 23 revisions

Basics

Kafka is quite different from traditional messaging systems such as RabbitMQ!

If you're new to Kafka, check out the following resources for more information about how consumption works in Kafka:

Misc Points

  • Consumer typically work as part of a group. At any given time, one and only one consumer in a group will be assigned to read from each partition of a subscribed to topic (assuming the group is not currently rebalancing). You use the Subscribe method to join a group.

  • You aren't required to use consumer groups. You can directly assign to specific partitions using the Assign method (and never call the Subscribe method).

  • You must specify a GroupId in your configuration. You must do this even if you don't utilize any group functionality. We will fix this at some point.

  • It's fine to call Consume() with a timeout of 0 and this is a non blocking operation - you'll get a message if one is available, null if not.

  • The AutoOffsetReset property dictates the behavior when a committed offset is not available for a partition or is invalid. If your consumer is not getting messages, try setting this to AutoOffsetReset.Earliest and see if that helps (common misunderstanding!).

High Level Architecture

The Consumer API sits at a much high level of abstraction than the Kafka protocol, which is used to communicate with the cluster.

When you call Consume, you are pulling messages from an local in-memory queue - you are not directly sending requests to the cluster. Behind the scenes, the client orchestrates connecting to the required brokers, automatically correcting in the case of leader changes etc.

Although your application consumes messages one by one, messages are pulled by the client from brokers in batches for efficiency. By default, caching of messages on the client is very aggressive (optimized for high throughput). You may want to reduce this in scenarios. The configuration property you want is: QueuedMaxMessagesKbytes.

Errors

There are two types of error in Kafka - retryable and non-retryable. Generally, the client will automatically recover from any re-tryable error without the application developer needing to do anything or even being informed.

Generally errors exposed via the error callback or in the log represent re-tryable errors and are for informational purposes only.

However, if an error has the IsFatal flag marked as true (should generally never happen), the consumer is in an unrecoverable state and must be re-created.

Partition Assignments

The term 'Assign' is used for more than one purpose unfortunately. There are two related concepts:

  1. The assignment given to a particular consumer by the consumer group.
  2. The partitions assigned to be read from by the consumer.

With the Java client, these are always the same. The .NET Client is more flexible - it allows you to override the assignment given to you by the group to be whatever you want. This is an advanced, uncommonly used feature. It might be useful for example to add a control message partition to be read by every consumer in addition to the assigned partitions.

Committing Offsets

The docs for the .NET client on the confluent website go into a fair bit of information about how to commit offsets:

https://docs.confluent.io/current/clients/dotnet.html#auto-offset-commit

TLDR: you should probably be using StoreOffsets, set EnableAutoOffsetStore to false and EnableAutoCommit to true (the default)

Question: I want to synchronously commit offsets after each consumed message. It's very slow. How do I make it fast?

librdkafka uses the same broker connection both for commits and fetching messages, thus a commit may be backed up behind a long-poll blocking Fetch request. The long-poll behavior of fetch requests is configured with the fetch.wait.max.ms property that defaults to 100ms. You can decrease that value to decrease offset commit latency at the expense of a larger number of Fetch requests (depending on your traffic pattern). Also see https://github.com/edenhill/librdkafka/issues/1787

todo: is this information still current?

At-most once, at-least once and exactly once semantics

It's possible to write your consume loop such that your application is guaranteed to process each message at-most once, or at-least once. It's also possible to write code that provides neither guarantee - which is what you get by default! you probably want to change this.

When v1.4 is released, you'll also be able to write stream processing applications (kafka -> kafka) with exactly-once semantics.

Default configuration behavior

By default, EnableAutoCommit and EnableAutoOffsetStore are both set to true.

With auto commit enabled, the consumer automatically commits offsets to Kafka periodically in a background thread if they have been marked as ready to store. With auto store offset enabled, offsets are marked ready to store immediately prior to a message being delivered to the application via Consume.

With this setup:

  1. your application may fail to process a message if it crashes between when the Consume method stores the message offset as ready for commit, and when your application has finished processing the message. This will occur if additionally before the application crashes, a commit protocol request is successfully sent to the cluster by the consumer in the background.

  2. your application may process messages more than once. This may occur because offsets are only committed periodically to the cluster - it's possible for your application to process a number of messages and crash before the corresponding background auto commit occurs.

For more information, refer to the Confluent docs.

MaxPollIntervalMs, SessionTimeoutMs and HeartbeatIntervalMs

The consumer automatically sends 'heartbeat' messages to the group controller every HeartbeatIntervalMs in a background thread. If the controller does not receive a heartbeat message within SessionTimeoutMs it assumes the Consumer has died, and kicks it out of the consumer group. If the consumer has not in fact died, it will automatically ask to rejoin the group when connectivity is re-established.

MaxPollIntervalMs protects against the situation where the application processing logic may have become unresponsive, but the consumer's background thread is still diligently sending heartbeats to the group coordinator, so the consumer remains in the group. You must call Consume with a period not greater than MaxPollIntervalMs otherwise or the consumer will be kicked out of the group.

Why is there no ConsumeAsync, CommitAsync

We haven't implemented these yet (and we want to take the time to do so properly). We do want to add them because they allow you to write more idiomatic C# code.

If you are trying to set up a HostedService, instead consider setting up a dedicated background thread (tied to app lifetime) and do a standard sync consume loop in that. this is completely fine - just not idiomatic C# (everything is async these days). this approach will actually be measurably more performant than an async approach because that comes with a fair bit of overhead (compared to the # msgs / s you can get out of the kafka consumer!)

Alternatively you could fake an async consume method using await Task.Run(() => cosumer.Consumer(timeout)). That has more overhead than approach #1, but will allow you to use the standard hosted service pattern (you'll still get 100's of thousands of messages a second out of it).