Access Kafka consumer #1365
-
Hi, is there any way we can access the Kafka consumer backing an handler? I see
Something like @broker.subscriber("input-topic", auto_commit=False)
def handle(body : Foo, msg : KafkaMessage):
try:
send_to_external_system(body)
msg.ack()
except ExternalSystemNotAvailableError:
# message can be retried
msg.nack()
get_consumer().pause()
wait_for_five_minutes_and_resume() # or ping the external dependency until it's up again and then resume
except Exception as err:
# message failed for other reasons
handle_error(err) # do something else: log, send to DLQ, ..
msg.ack() (or use a decorator) Right now (*) the only thing we can do is retrying the message without pausing the consumer but this means that either we reprocess a message continuosly or, if we add a Thanks, lorenzo (*) Actually, with version 0.47.0 nack-ing a message still results in the offset being committed, see my comment here |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Indeed, |
Beta Was this translation helpful? Give feedback.
Indeed,
KafkaMessage
requires ConsumerProtocol, but in fact it is aAIOKafkaConsumer
object, so you can use it the way you want