{"payload":{"feedbackUrl":"https://github.com/orgs/community/discussions/53140","repo":{"id":55092426,"defaultBranch":"master","name":"kafka-monitor","ownerLogin":"linkedin","currentUserCanPush":false,"isFork":false,"isEmpty":false,"createdAt":"2016-03-30T19:42:49.000Z","ownerAvatar":"https://avatars.githubusercontent.com/u/357098?v=4","public":true,"private":false,"isOrgOwned":true},"refInfo":{"name":"","listCacheKey":"v0:1679522245.0","currentOid":""},"activityList":{"items":[{"before":"552edc8f5bd4c5a2097cf913cba5a542c4310bb6","after":null,"ref":"refs/tags/2.5.15.1","pushedAt":"2023-03-22T21:57:25.000Z","pushType":"branch_deletion","commitsCount":0,"pusher":{"login":"mhratson","name":"Maryan Hratson","path":"/mhratson","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/25061204?s=80&v=4"}},{"before":"0da972fbe672b9d6a68bdfdb61f5da3027d001f4","after":null,"ref":"refs/heads/LIKAFKA-34677-fix-close-error","pushedAt":"2023-03-22T21:19:08.000Z","pushType":"branch_deletion","commitsCount":0,"pusher":{"login":"mhratson","name":"Maryan Hratson","path":"/mhratson","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/25061204?s=80&v=4"}},{"before":"c4a9c71b4e65b0a0f0cca7764d7068e01f2e0c98","after":"043db6419d1638ea9f8f0b8262950fba576c2806","ref":"refs/heads/master","pushedAt":"2023-03-22T21:19:07.000Z","pushType":"pr_merge","commitsCount":1,"pusher":{"login":"mhratson","name":"Maryan Hratson","path":"/mhratson","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/25061204?s=80&v=4"},"commit":{"message":"ConsumeService: fix client closing causing `ConcurrentModificationException`\n\n## Problem\n\n- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid\n- as docummented in _kafka consumer docs_[^1]\n\n> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.\n\nThe exception thrown\n```\n2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.\njava.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service\n        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.utils.CloseableLock.(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]\n```\n\n## Solution\n\nThe recommended solution[^1] is\n- to use `consumer.wakeup();` method\n- but the method is not yet adopted by the `KMBaseConsumer` interface\n- so for now `_baseConsumer.close()` is moved into the thread\n- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits\n\n[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)\n\n## Testing Done\n\nIncreased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.\n\n`- ./gradlew test`","shortMessageHtmlLink":"ConsumeService: fix client closing causing `ConcurrentModificationExc…"}},{"before":"c9ffcb1bb479ba6d53b883f94dc4b8a512a8248d","after":"0da972fbe672b9d6a68bdfdb61f5da3027d001f4","ref":"refs/heads/LIKAFKA-34677-fix-close-error","pushedAt":"2023-03-22T02:24:38.000Z","pushType":"force_push","commitsCount":0,"pusher":{"login":"mhratson","name":"Maryan Hratson","path":"/mhratson","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/25061204?s=80&v=4"},"commit":{"message":"ConsumeService: fix client closing causing `ConcurrentModificationException`\n\n## Problem\n\n- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid\n- as docummented in _kafka consumer docs_[^1]\n\n> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.\n\nThe exception thrown\n```\n2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.\njava.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service\n        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.utils.CloseableLock.(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]\n```\n\n## Solution\n\nThe recommended solution[^1] is\n- to use `consumer.wakeup();` method\n- but the method is not yet adopted by the `KMBaseConsumer` interface\n- so for now `_baseConsumer.close()` is moved into the thread\n- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits\n\n[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)\n\n## Testing Done\n\nIncreased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.\n\n`- ./gradlew test`","shortMessageHtmlLink":"ConsumeService: fix client closing causing `ConcurrentModificationExc…"}},{"before":"fddc465da513ef7578113405618d7f61235623a6","after":"c9ffcb1bb479ba6d53b883f94dc4b8a512a8248d","ref":"refs/heads/LIKAFKA-34677-fix-close-error","pushedAt":"2023-03-22T02:18:53.000Z","pushType":"force_push","commitsCount":0,"pusher":{"login":"mhratson","name":"Maryan Hratson","path":"/mhratson","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/25061204?s=80&v=4"},"commit":{"message":"ConsumeService: fix client closing causing `ConcurrentModificationException`\n\n## Problem\n\n- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid\n- as docummented in _kafka consumer docs_[^1]\n\n> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.\n\nThe exception thrown\n```\n2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.\njava.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service\n        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.utils.CloseableLock.(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]\n```\n\n## Solution\n\nThe recommended solution[^1] is\n- to use `consumer.wakeup();` method\n- but the method is not yet adopted by the `KMBaseConsumer` interface\n- so for now `_baseConsumer.close()` is moved into the thread\n- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits\n\n[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)\n\n## Testing Done\n\nIncreased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.\n\n`- ./gradlew test`","shortMessageHtmlLink":"ConsumeService: fix client closing causing `ConcurrentModificationExc…"}},{"before":null,"after":"fddc465da513ef7578113405618d7f61235623a6","ref":"refs/heads/LIKAFKA-34677-fix-close-error","pushedAt":"2023-03-22T01:44:55.000Z","pushType":"branch_creation","commitsCount":0,"pusher":{"login":"mhratson","name":"Maryan Hratson","path":"/mhratson","primaryAvatarUrl":"https://avatars.githubusercontent.com/u/25061204?s=80&v=4"},"commit":{"message":"ConsumeService: fix client closing causing `ConcurrentModificationException`\n\n## Summary\n\n- calling `_baseConsumer.close()`, outside of the thread consumer is running in, is invalid\n- as docummented in (kafka consumer docs)[^1]\n\n> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.\n\nThe recommended solution is\n- to use `consumer.wakeup();` method\n- but the method is not yet adopted by the `KMBaseConsumer` interface\n- so for now `_baseConsumer.close()` is moved into the thread\n\n### Details\n\nThe exception\n```\n2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.\njava.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service\n        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.utils.CloseableLock.(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]\n        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]\n```\n\n[^1]:https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502\n\n## Testing Done\n\nIncreased `thread.join(5000)` timeout as this implementation is slower to\nstop due to not interrupting the consumer thread.\n\n- ./gradlew test","shortMessageHtmlLink":"ConsumeService: fix client closing causing `ConcurrentModificationExc…"}}],"hasNextPage":false,"hasPreviousPage":false,"activityType":"all","actor":null,"timePeriod":"all","sort":"DESC","perPage":30,"cursor":"djE6ks8AAAADCTtJgwA","startCursor":null,"endCursor":null}},"title":"Activity · linkedin/kafka-monitor"}