diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java index 217bc2fd0..3c1f97435 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java @@ -80,16 +80,16 @@ public ProcessContinuation processElement( .lastClaimed() .ifPresent( lastClaimedOffset -> { - Committer committer = committerFactory.apply(subscriptionPartition); - committer.startAsync().awaitRunning(); - // Commit the next-to-deliver offset. - try { - committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); - } catch (Exception e) { - throw ExtractStatus.toCanonical(e).underlying; - } - committer.stopAsync().awaitTerminated(); - }); + Committer committer = committerFactory.apply(subscriptionPartition); + committer.startAsync().awaitRunning(); + // Commit the next-to-deliver offset. + try { + committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); + } catch (Exception e) { + throw ExtractStatus.toCanonical(e).underlying; + } + committer.stopAsync().awaitTerminated(); + }); return result; } } diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java index a596eba57..c0980e466 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java @@ -143,8 +143,7 @@ public void process() throws Exception { }); doReturn(Optional.of(example(Offset.class))).when(processor).lastClaimed(); when(committer.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null)); - assertEquals( - ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output)); + assertEquals(ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output)); verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output)); InOrder order = inOrder(processor); order.verify(processor).start();