diff --git a/pom.xml b/pom.xml index 42bcbb3b..06a8bb2e 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.google.cloud google-cloud-pubsublite-parent - 0.16.0 + 0.17.0 4.0.0 com.google.cloud diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java index aaca0ccb..f3635528 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java @@ -136,6 +136,7 @@ MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) { .instantiate()); } + @SuppressWarnings("CheckReturnValue") PartitionSubscriberFactory getSubscriberFactory() { return (partition, offset, consumer) -> { PubsubContext context = PubsubContext.of(Constants.FRAMEWORK); diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java index cf336a35..066a1eae 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -117,6 +117,7 @@ private static void extractVal( } } + @SuppressWarnings("CheckReturnValue") public static Message toPubSubMessage(StructType inputSchema, InternalRow row) { Message.Builder builder = Message.builder(); extractVal( diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java index 711a241a..f4ae31ec 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java @@ -34,7 +34,7 @@ public class CachedPublishers { private final Executor listenerExecutor = Executors.newSingleThreadExecutor(); @GuardedBy("this") - private static final Map> publishers = + private final Map> publishers = new HashMap<>(); public synchronized Publisher getOrCreate( diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java index 4c221f1d..b4362944 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java @@ -19,6 +19,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.spark.PslSourceOffset; @@ -115,30 +116,29 @@ private synchronized void cleanUpCommitterMap() { @Override public synchronized void commit(PslSourceOffset offset) { updateCommitterMap(offset); - offset - .partitionOffsetMap() - .forEach( - (p, o) -> { - // Note we don't need to worry about commit offset disorder here since Committer - // guarantees the ordering. Once commitOffset() returns, it's either already - // sent to stream, or waiting for next connection to open to be sent in order. - ApiFuture future = committerMap.get(p).commitOffset(o); - ApiFutures.addCallback( - future, - new ApiFutureCallback() { - @Override - public void onFailure(Throwable t) { - if (!future.isCancelled()) { - log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t); - } - } + for (Map.Entry entry : offset.partitionOffsetMap().entrySet()) { + // Note we don't need to worry about commit offset disorder here since Committer + // guarantees the ordering. Once commitOffset() returns, it's either already + // sent to stream, or waiting for next connection to open to be sent in order. + ApiFuture future = committerMap.get(entry.getKey()).commitOffset(entry.getValue()); + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + if (!future.isCancelled()) { + log.atWarning().withCause(t).log( + "Failed to commit %s,%s.", entry.getKey().value(), entry.getValue().value()); + } + } - @Override - public void onSuccess(Void result) { - log.atInfo().log("Committed %s,%s.", p.value(), o.value()); - } - }, - MoreExecutors.directExecutor()); - }); + @Override + public void onSuccess(Void result) { + log.atInfo().log( + "Committed %s,%s.", entry.getKey().value(), entry.getValue().value()); + } + }, + MoreExecutors.directExecutor()); + } } }