Skip to content

Commit

Permalink
deps: update dependency com.google.cloud:google-cloud-pubsublite-pare…
Browse files Browse the repository at this point in the history
…nt to v0.16.1 (#215)

* update

* update

* version bump

Co-authored-by: Tianzi Cai <tianzi@google.com>
  • Loading branch information
jiangmichaellll and anguillanneuf committed Jul 28, 2021
1 parent 349dc13 commit 1c1aae9
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>0.16.0</version>
<version>0.17.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
Expand Down
Expand Up @@ -136,6 +136,7 @@ MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) {
.instantiate());
}

@SuppressWarnings("CheckReturnValue")
PartitionSubscriberFactory getSubscriberFactory() {
return (partition, offset, consumer) -> {
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
Expand Down
Expand Up @@ -117,6 +117,7 @@ private static <T> void extractVal(
}
}

@SuppressWarnings("CheckReturnValue")
public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
Message.Builder builder = Message.builder();
extractVal(
Expand Down
Expand Up @@ -34,7 +34,7 @@ public class CachedPublishers {
private final Executor listenerExecutor = Executors.newSingleThreadExecutor();

@GuardedBy("this")
private static final Map<PslWriteDataSourceOptions, Publisher<MessageMetadata>> publishers =
private final Map<PslWriteDataSourceOptions, Publisher<MessageMetadata>> publishers =
new HashMap<>();

public synchronized Publisher<MessageMetadata> getOrCreate(
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.spark.PslSourceOffset;
Expand Down Expand Up @@ -115,30 +116,29 @@ private synchronized void cleanUpCommitterMap() {
@Override
public synchronized void commit(PslSourceOffset offset) {
updateCommitterMap(offset);
offset
.partitionOffsetMap()
.forEach(
(p, o) -> {
// Note we don't need to worry about commit offset disorder here since Committer
// guarantees the ordering. Once commitOffset() returns, it's either already
// sent to stream, or waiting for next connection to open to be sent in order.
ApiFuture<Void> future = committerMap.get(p).commitOffset(o);
ApiFutures.addCallback(
future,
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
if (!future.isCancelled()) {
log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t);
}
}
for (Map.Entry<Partition, Offset> entry : offset.partitionOffsetMap().entrySet()) {
// Note we don't need to worry about commit offset disorder here since Committer
// guarantees the ordering. Once commitOffset() returns, it's either already
// sent to stream, or waiting for next connection to open to be sent in order.
ApiFuture<Void> future = committerMap.get(entry.getKey()).commitOffset(entry.getValue());
ApiFutures.addCallback(
future,
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
if (!future.isCancelled()) {
log.atWarning().withCause(t).log(
"Failed to commit %s,%s.", entry.getKey().value(), entry.getValue().value());
}
}

@Override
public void onSuccess(Void result) {
log.atInfo().log("Committed %s,%s.", p.value(), o.value());
}
},
MoreExecutors.directExecutor());
});
@Override
public void onSuccess(Void result) {
log.atInfo().log(
"Committed %s,%s.", entry.getKey().value(), entry.getValue().value());
}
},
MoreExecutors.directExecutor());
}
}
}

0 comments on commit 1c1aae9

Please sign in to comment.