diff --git a/pom.xml b/pom.xml index a15d5ef6..373b6036 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.google.cloud google-cloud-pubsublite-parent - 0.16.0 + 0.18.0 4.0.0 com.google.cloud @@ -17,12 +17,12 @@ com.google.api.grpc proto-google-cloud-pubsublite-v1 - 0.16.1 + ${project.parent.version} com.google.cloud google-cloud-pubsublite - 0.16.1 + ${project.parent.version} org.apache.kafka diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 8fdd3c4a..05e667c4 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -22,7 +22,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; -import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; @@ -79,9 +79,9 @@ public abstract static class Builder { } public Consumer instantiate() throws ApiException { - CloudZone zone = subscriptionPath().location(); + CloudRegion region = subscriptionPath().location().extractRegion(); try (AdminClient adminClient = - AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) { + AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) { Subscription subscription = adminClient.getSubscription(subscriptionPath()).get(); TopicPath topic = TopicPath.parse(subscription.getTopic()); AssignerFactory assignerFactory = @@ -93,8 +93,7 @@ public Consumer instantiate() throws ApiException { .setServiceClient( PartitionAssignmentServiceClient.create( ServiceClients.addDefaultSettings( - subscriptionPath().location().region(), - PartitionAssignmentServiceSettings.newBuilder()))) + region, PartitionAssignmentServiceSettings.newBuilder()))) .build() .instantiate(); } catch (Throwable t) { @@ -113,7 +112,7 @@ public Consumer instantiate() throws ApiException { .setServiceClient( SubscriberServiceClient.create( ServiceClients.addDefaultSettings( - subscriptionPath().location().region(), + region, ServiceClients.addDefaultMetadata( PubsubContext.of(FRAMEWORK), RoutingMetadata.of(subscriptionPath(), partition), @@ -136,8 +135,7 @@ public Consumer instantiate() throws ApiException { .setServiceClient( CursorServiceClient.create( ServiceClients.addDefaultSettings( - subscriptionPath().location().region(), - CursorServiceSettings.newBuilder()))) + region, CursorServiceSettings.newBuilder()))) .build() .instantiate(); } catch (Throwable t) { @@ -150,14 +148,12 @@ public Consumer instantiate() throws ApiException { topic, autocommit(), pullSubscriberFactory, committerFactory); CursorClient cursorClient = - CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build()); + CursorClient.create(CursorClientSettings.newBuilder().setRegion(region).build()); TopicStatsClient topicStatsClient = - TopicStatsClient.create( - TopicStatsClientSettings.newBuilder().setRegion(zone.region()).build()); + TopicStatsClient.create(TopicStatsClientSettings.newBuilder().setRegion(region).build()); SharedBehavior shared = new SharedBehavior( - AdminClient.create( - AdminClientSettings.newBuilder().setRegion(topic.location().region()).build())); + AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())); return new PubsubLiteConsumer( subscriptionPath(), topic, diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java index 2a2d525c..b1572e43 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java @@ -51,7 +51,7 @@ public abstract static class Builder { private AdminClient newAdminClient() { return AdminClient.create( - AdminClientSettings.newBuilder().setRegion(topicPath().location().region()).build()); + AdminClientSettings.newBuilder().setRegion(topicPath().location().extractRegion()).build()); } public Producer instantiate() throws ApiException { @@ -66,7 +66,7 @@ public Producer instantiate() throws ApiException { .setServiceClient( PublisherServiceClient.create( ServiceClients.addDefaultSettings( - topicPath().location().region(), + topicPath().location().extractRegion(), ServiceClients.addDefaultMetadata( PubsubContext.of(FRAMEWORK), RoutingMetadata.of(topicPath(), partition), diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java b/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java index dddbcd22..61839d3c 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java @@ -38,7 +38,7 @@ static Message toMessage(ProducerRecord record) { .setKey(ByteString.copyFrom(record.key())) .setData(ByteString.copyFrom(record.value())); if (record.timestamp() != null) { - builder.setEventTime(Timestamps.fromMillis(record.timestamp())); + builder = builder.setEventTime(Timestamps.fromMillis(record.timestamp())); } ImmutableListMultimap.Builder attributes = ImmutableListMultimap.builder(); record diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java index 5758f63a..5a5adca9 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -98,12 +98,14 @@ static class SubscriberState { } @Override + @SuppressWarnings("GuardedBy") public void setAssignment(Set assignment) { try (CloseableMonitor.Hold h = monitor.enter()) { + List unassigned = ImmutableSet.copyOf(partitions.keySet()).stream() .filter(p -> !assignment.contains(p)) - .map(p -> partitions.remove(p)) + .map(partitions::remove) .collect(Collectors.toList()); for (SubscriberState state : unassigned) { state.subscriber.close(); @@ -241,6 +243,7 @@ public ApiFuture> commitAll() { } @Override + @SuppressWarnings("GuardedBy") public ApiFuture commit(Map commitOffsets) { try (CloseableMonitor.Hold h = monitor.enter()) { ImmutableList.Builder> commitFutures = ImmutableList.builder(); @@ -282,8 +285,10 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti @Override public Optional position(Partition partition) { - if (!partitions.containsKey(partition)) return Optional.empty(); - return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1); + try (CloseableMonitor.Hold h = monitor.enter()) { + if (!partitions.containsKey(partition)) return Optional.empty(); + return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1); + } } @Override