diff --git a/pom.xml b/pom.xml
index 66bf2330..e9503306 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.google.cloud
google-cloud-pubsublite-parent
- 0.16.0
+ 0.17.0
4.0.0
com.google.cloud
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java
index aaca0ccb..f3635528 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java
@@ -136,6 +136,7 @@ MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) {
.instantiate());
}
+ @SuppressWarnings("CheckReturnValue")
PartitionSubscriberFactory getSubscriberFactory() {
return (partition, offset, consumer) -> {
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
index cf336a35..066a1eae 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
@@ -117,6 +117,7 @@ private static void extractVal(
}
}
+ @SuppressWarnings("CheckReturnValue")
public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
Message.Builder builder = Message.builder();
extractVal(
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java
index 711a241a..f4ae31ec 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/CachedPublishers.java
@@ -34,7 +34,7 @@ public class CachedPublishers {
private final Executor listenerExecutor = Executors.newSingleThreadExecutor();
@GuardedBy("this")
- private static final Map> publishers =
+ private final Map> publishers =
new HashMap<>();
public synchronized Publisher getOrCreate(
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java
index 4c221f1d..b4362944 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImpl.java
@@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.spark.PslSourceOffset;
@@ -115,30 +116,29 @@ private synchronized void cleanUpCommitterMap() {
@Override
public synchronized void commit(PslSourceOffset offset) {
updateCommitterMap(offset);
- offset
- .partitionOffsetMap()
- .forEach(
- (p, o) -> {
- // Note we don't need to worry about commit offset disorder here since Committer
- // guarantees the ordering. Once commitOffset() returns, it's either already
- // sent to stream, or waiting for next connection to open to be sent in order.
- ApiFuture future = committerMap.get(p).commitOffset(o);
- ApiFutures.addCallback(
- future,
- new ApiFutureCallback() {
- @Override
- public void onFailure(Throwable t) {
- if (!future.isCancelled()) {
- log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t);
- }
- }
+ for (Map.Entry entry : offset.partitionOffsetMap().entrySet()) {
+ // Note we don't need to worry about commit offset disorder here since Committer
+ // guarantees the ordering. Once commitOffset() returns, it's either already
+ // sent to stream, or waiting for next connection to open to be sent in order.
+ ApiFuture future = committerMap.get(entry.getKey()).commitOffset(entry.getValue());
+ ApiFutures.addCallback(
+ future,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ if (!future.isCancelled()) {
+ log.atWarning().withCause(t).log(
+ "Failed to commit %s,%s.", entry.getKey().value(), entry.getValue().value());
+ }
+ }
- @Override
- public void onSuccess(Void result) {
- log.atInfo().log("Committed %s,%s.", p.value(), o.value());
- }
- },
- MoreExecutors.directExecutor());
- });
+ @Override
+ public void onSuccess(Void result) {
+ log.atInfo().log(
+ "Committed %s,%s.", entry.getKey().value(), entry.getValue().value());
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
}
}