From 95aba77a2b76be8931796c7615ab3678cd168ac3 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 1 Mar 2021 17:44:00 -0500 Subject: [PATCH] fix: Add admin client in producer settings --- .../cloud/pubsublite/kafka/ProducerSettings.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java index 4baf0f45..073773ad 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java @@ -48,10 +48,16 @@ public abstract static class Builder { public abstract ProducerSettings build(); } + private AdminClient newAdminClient() { + return AdminClient.create( + AdminClientSettings.newBuilder().setRegion(topicPath().location().region()).build()); + } + public Producer instantiate() throws ApiException { PartitionCountWatchingPublisherSettings publisherSettings = PartitionCountWatchingPublisherSettings.newBuilder() .setTopic(topicPath()) + .setAdminClient(newAdminClient()) .setPublisherFactory( partition -> { try { @@ -72,12 +78,7 @@ public Producer instantiate() throws ApiException { } }) .build(); - SharedBehavior shared = - new SharedBehavior( - AdminClient.create( - AdminClientSettings.newBuilder() - .setRegion(topicPath().location().region()) - .build())); + SharedBehavior shared = new SharedBehavior(newAdminClient()); return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath()); } }