Skip to content

Commit

Permalink
deps: version bump underlying pubsub lite library (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Aug 4, 2021
1 parent b541514 commit 665890e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 22 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>0.16.0</version>
<version>0.18.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
Expand All @@ -17,12 +17,12 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.16.1</version>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.16.1</version>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Expand Up @@ -22,7 +22,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
Expand Down Expand Up @@ -79,9 +79,9 @@ public abstract static class Builder {
}

public Consumer<byte[], byte[]> instantiate() throws ApiException {
CloudZone zone = subscriptionPath().location();
CloudRegion region = subscriptionPath().location().extractRegion();
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
AssignerFactory assignerFactory =
Expand All @@ -93,8 +93,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
.setServiceClient(
PartitionAssignmentServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
PartitionAssignmentServiceSettings.newBuilder())))
region, PartitionAssignmentServiceSettings.newBuilder())))
.build()
.instantiate();
} catch (Throwable t) {
Expand All @@ -113,7 +112,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
.setServiceClient(
SubscriberServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
region,
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition),
Expand All @@ -136,8 +135,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
.setServiceClient(
CursorServiceClient.create(
ServiceClients.addDefaultSettings(
subscriptionPath().location().region(),
CursorServiceSettings.newBuilder())))
region, CursorServiceSettings.newBuilder())))
.build()
.instantiate();
} catch (Throwable t) {
Expand All @@ -150,14 +148,12 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
topic, autocommit(), pullSubscriberFactory, committerFactory);

CursorClient cursorClient =
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());
CursorClient.create(CursorClientSettings.newBuilder().setRegion(region).build());
TopicStatsClient topicStatsClient =
TopicStatsClient.create(
TopicStatsClientSettings.newBuilder().setRegion(zone.region()).build());
TopicStatsClient.create(TopicStatsClientSettings.newBuilder().setRegion(region).build());
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build()));
return new PubsubLiteConsumer(
subscriptionPath(),
topic,
Expand Down
Expand Up @@ -51,7 +51,7 @@ public abstract static class Builder {

private AdminClient newAdminClient() {
return AdminClient.create(
AdminClientSettings.newBuilder().setRegion(topicPath().location().region()).build());
AdminClientSettings.newBuilder().setRegion(topicPath().location().extractRegion()).build());
}

public Producer<byte[], byte[]> instantiate() throws ApiException {
Expand All @@ -66,7 +66,7 @@ public Producer<byte[], byte[]> instantiate() throws ApiException {
.setServiceClient(
PublisherServiceClient.create(
ServiceClients.addDefaultSettings(
topicPath().location().region(),
topicPath().location().extractRegion(),
ServiceClients.addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
Expand Down
Expand Up @@ -38,7 +38,7 @@ static Message toMessage(ProducerRecord<byte[], byte[]> record) {
.setKey(ByteString.copyFrom(record.key()))
.setData(ByteString.copyFrom(record.value()));
if (record.timestamp() != null) {
builder.setEventTime(Timestamps.fromMillis(record.timestamp()));
builder = builder.setEventTime(Timestamps.fromMillis(record.timestamp()));
}
ImmutableListMultimap.Builder<String, ByteString> attributes = ImmutableListMultimap.builder();
record
Expand Down
Expand Up @@ -98,12 +98,14 @@ static class SubscriberState {
}

@Override
@SuppressWarnings("GuardedBy")
public void setAssignment(Set<Partition> assignment) {
try (CloseableMonitor.Hold h = monitor.enter()) {

List<SubscriberState> unassigned =
ImmutableSet.copyOf(partitions.keySet()).stream()
.filter(p -> !assignment.contains(p))
.map(p -> partitions.remove(p))
.map(partitions::remove)
.collect(Collectors.toList());
for (SubscriberState state : unassigned) {
state.subscriber.close();
Expand Down Expand Up @@ -241,6 +243,7 @@ public ApiFuture<Map<Partition, Offset>> commitAll() {
}

@Override
@SuppressWarnings("GuardedBy")
public ApiFuture<Void> commit(Map<Partition, Offset> commitOffsets) {
try (CloseableMonitor.Hold h = monitor.enter()) {
ImmutableList.Builder<ApiFuture<?>> commitFutures = ImmutableList.builder();
Expand Down Expand Up @@ -282,8 +285,10 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti

@Override
public Optional<Long> position(Partition partition) {
if (!partitions.containsKey(partition)) return Optional.empty();
return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1);
try (CloseableMonitor.Hold h = monitor.enter()) {
if (!partitions.containsKey(partition)) return Optional.empty();
return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1);
}
}

@Override
Expand Down

0 comments on commit 665890e

Please sign in to comment.