Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer committed offsets not tracking LSO with FixTxOffsets and rollbacked transactions #2590

Open
bgK opened this issue Feb 17, 2023 · 8 comments

Comments

@bgK
Copy link
Contributor

bgK commented Feb 17, 2023

In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.8.11

Describe the bug
With a MessageListenerContainer configured with FixTxOffsets set to true, when all the available messages have been read, the committed offsets do not always match the Last Stable Offsets.

MessageListenerContainer commits the position of the consumer when records are returned by poll() for a given partition. If a transaction is then rollbacked on that partition and no other records are available, MessageListenerContainer does not commit the offsets consumed by the rollbacked transaction, resulting in observable "lag".

To Reproduce
Create and start a MessageListenerContainer with FixTxOffsets set to true subscribed to a test topic with a single partition. Create a transactional producer, send a message, wait for the ack and rollback the transaction. The commited offset will be 0, the last stable offset will be 2.

Expected behavior
When all the messages have been consumed, with a MessageListenerContainer configured with FixTxOffsets set to true the committed offsets should track the LSO even when some transactions are rollbacked.

@garyrussell
Copy link
Contributor

There is nothing we can do about this; the consumer has no knowledge that there is a rolled back record in the log; the container can only "fix" the offset if the consumer reports the position properly and, obviously, in this case, it does not.

As I have stated many times in many different forums, this bogus lag problem needs to be fixed in Kafka itself. There are other corner cases where position() does not include the marker.

See https://issues.apache.org/jira/browse/KAFKA-10683

But the Kafka folks seem to have no interest in fixing it; I suggest you comment there and vote it up.

@garyrussell garyrussell closed this as not planned Won't fix, can't repro, duplicate, stale Feb 17, 2023
@bgK
Copy link
Contributor Author

bgK commented Feb 17, 2023

Thanks for your answer.

It seems to me that in the case of this issue, the position is correctly reported by the Kafka consumer. When I patch Spring-Kafka to always commit the consumer position even when no records are returned by poll(), the committed offsets account for the rolled back transactions.

Hacky patch for v2.8.11:

diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
index a787e0f5..480ff2cc 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -1363,13 +1363,13 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
 				resumePartitionsIfNecessary();
 			}
 
+			savePositionsIfNeeded();
 			invokeIfHaveRecords(records);
 		}
 
 		private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
 			if (records != null && records.count() > 0) {
 				this.receivedSome = true;
-				savePositionsIfNeeded(records);
 				notIdle();
 				notIdlePartitions(records.partitions());
 				invokeListener(records);
@@ -1441,29 +1441,31 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
 			}
 		}
 
-		private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
+		private void savePositionsIfNeeded() {
 			if (this.fixTxOffsets) {
 				this.savedPositions.clear();
-				records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
+				this.consumer.assignment().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
 			}
 		}
 
 		@SuppressWarnings("rawtypes")
 		private void fixTxOffsetsIfNeeded() {
 			if (this.fixTxOffsets) {
+
 				try {
 					Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
-					this.lastCommits.forEach((tp, oamd) -> {
+					this.savedPositions.forEach((tp, saved) -> {
 						long position = this.consumer.position(tp);
-						Long saved = this.savedPositions.get(tp);
+						OffsetAndMetadata comitted = this.lastCommits.get(tp);
 						if (saved != null && saved.longValue() != position) {
 							this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
 									+ "saved: " + this.savedPositions + ", "
-									+ "comitted: " + oamd + ", "
+									+ "comitted: " + comitted + ", "
 									+ "current: " + tp + "@" + position);
 							return;
 						}
-						if (position > oamd.offset()) {
+
+						if (comitted == null || position > comitted.offset()) {
 							toFix.put(tp, createOffsetAndMetadata(position));
 						}
 					});

@garyrussell
Copy link
Contributor

Interesting; I didn't test it; I just assumed this was another case where the position is incorrectly reported (as commented on that JIRA).

@garyrussell
Copy link
Contributor

Note that 2.8.x is no longer supported as OSS.

@garyrussell garyrussell added this to the 3.x Backlog milestone Feb 17, 2023
@thperchi
Copy link

Hi there,

I work for a company that contributes to the open-source ecosystem, and I would like to help resolve this issue (#2590). However, despite the explanations provided in the issue, I am unable to reproduce the bug.

@bgK Would it be possible to provide a code snippet that is not working (and your configuration if you think that it may somehow has any interaction with the bug) so that I can start investigating the issue and hopefully find a solution?

Thanks!

@bgK
Copy link
Contributor Author

bgK commented Apr 21, 2023

Hi @thperchi, this is a minimal reproduction case for the issue: spring-kafka-2590-repro.zip.

@thperchi
Copy link

thperchi commented May 5, 2023

Hi @bgK !
Could we have a call together to check out how I can fix the issue.
I'm not able to run the test, maybe it's my environment, that's why i'd like to have a call with you.
Contact me by mail.
Thanks in advance.

@alograg
Copy link

alograg commented Jul 18, 2023

Dicsution

@artembilan artembilan modified the milestones: 3.1 Backlog, Backlog Jan 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants