Skip to content

Commit

Permalink
fix: bundle finalizers
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Feb 23, 2021
1 parent cbd06f2 commit 2edcdbf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
Expand Up @@ -80,16 +80,16 @@ public ProcessContinuation processElement(
.lastClaimed()
.ifPresent(
lastClaimedOffset -> {
Committer committer = committerFactory.apply(subscriptionPartition);
committer.startAsync().awaitRunning();
// Commit the next-to-deliver offset.
try {
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
}
committer.stopAsync().awaitTerminated();
});
Committer committer = committerFactory.apply(subscriptionPartition);
committer.startAsync().awaitRunning();
// Commit the next-to-deliver offset.
try {
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
}
committer.stopAsync().awaitTerminated();
});
return result;
}
}
Expand Down
Expand Up @@ -143,8 +143,7 @@ public void process() throws Exception {
});
doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed();
when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
assertEquals(
ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output));
assertEquals(ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output));
verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output));
InOrder order = inOrder(processor);
order.verify(processor).start();
Expand Down

0 comments on commit 2edcdbf

Please sign in to comment.