diff --git a/clirr-ignored-differences.xml b/clirr-ignored-differences.xml new file mode 100644 index 00000000..5c65da62 --- /dev/null +++ b/clirr-ignored-differences.xml @@ -0,0 +1,10 @@ + + + + + + 7013 + **/*$Builder + * + + \ No newline at end of file diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 05e667c4..1e7df616 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -47,6 +47,7 @@ import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; +import java.util.Optional; import org.apache.kafka.clients.consumer.Consumer; @AutoValue @@ -61,6 +62,8 @@ public abstract class ConsumerSettings { // Optional parameters. abstract boolean autocommit(); + abstract Optional topicPathOverride(); + public static Builder newBuilder() { return (new AutoValue_ConsumerSettings.Builder()).setAutocommit(false); } @@ -68,22 +71,46 @@ public static Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { // Required parameters. + /** + * The subscription path to use. Only the topic corresponding to this subscription can be + * subscribed to. + */ public abstract Builder setSubscriptionPath(SubscriptionPath path); + /** The per-partition flow control settings. */ public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings); // Optional parameters. + /** The autocommit mode. */ public abstract Builder setAutocommit(boolean autocommit); + /** + * An override for the TopicPath used by this consumer. + * + *

When this is set, the topic path of the subscription will not be fetched: instead, the + * topic used in methods will be compared with the provided TopicPath object. + * + *

This is useful if you do not have the pubsublite.subscriptions.get permission for the + * subscription. + */ + public abstract Builder setTopicPathOverride(TopicPath topicPath); + public abstract ConsumerSettings build(); } public Consumer instantiate() throws ApiException { - CloudRegion region = subscriptionPath().location().extractRegion(); - try (AdminClient adminClient = - AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) { - Subscription subscription = adminClient.getSubscription(subscriptionPath()).get(); - TopicPath topic = TopicPath.parse(subscription.getTopic()); + try { + CloudRegion region = subscriptionPath().location().extractRegion(); + TopicPath topic; + if (topicPathOverride().isPresent()) { + topic = topicPathOverride().get(); + } else { + try (AdminClient adminClient = + AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) { + Subscription subscription = adminClient.getSubscription(subscriptionPath()).get(); + topic = TopicPath.parse(subscription.getTopic()); + } + } AssignerFactory assignerFactory = receiver -> { try {