Skip to content

Commit

Permalink
[GOBBLIN-2044] Catch and log exceptions in HighLevelConsumer queue co…
Browse files Browse the repository at this point in the history
…nsumption (#3923)

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
umustafi and Urmi Mustafi committed Apr 16, 2024
1 parent 1fc5fb0 commit 99c5a76
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,10 @@ public QueueProcessor(BlockingQueue queue) {
@Override
public void run() {
log.info("Starting queue processing.. " + Thread.currentThread().getName());
KafkaConsumerRecord record = null;
try {
while (true) {
KafkaConsumerRecord record = queue.take();
record = queue.take();
messagesRead.inc();
HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
recordsProcessed.incrementAndGet();
Expand All @@ -349,9 +350,11 @@ public void run() {
}
}
} catch (InterruptedException e) {
log.warn("Encountered exception while processing queue ", e);
log.warn("Thread interrupted while processing queue ", e);
// TODO: evaluate whether we should interrupt the thread or continue processing
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Encountered exception while processing record so stopping queue processing. Record: {} Exception: {}", record, e);
}
}
}
Expand Down

0 comments on commit 99c5a76

Please sign in to comment.