Skip to content

Commit

Permalink
feat: Add setTopicPathOverride to consumer settings (#174)
Browse files Browse the repository at this point in the history
* feat: Add setTopicPathOverride to consumer settings

* fix: Clirr
  • Loading branch information
dpcollins-google committed Aug 6, 2021
1 parent 665890e commit 7106862
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
10 changes: 10 additions & 0 deletions clirr-ignored-differences.xml
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
<className>**/*$Builder</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -47,6 +47,7 @@
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;

@AutoValue
Expand All @@ -61,29 +62,55 @@ public abstract class ConsumerSettings {
// Optional parameters.
abstract boolean autocommit();

abstract Optional<TopicPath> topicPathOverride();

public static Builder newBuilder() {
return (new AutoValue_ConsumerSettings.Builder()).setAutocommit(false);
}

@AutoValue.Builder
public abstract static class Builder {
// Required parameters.
/**
* The subscription path to use. Only the topic corresponding to this subscription can be
* subscribed to.
*/
public abstract Builder setSubscriptionPath(SubscriptionPath path);

/** The per-partition flow control settings. */
public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings);

// Optional parameters.
/** The autocommit mode. */
public abstract Builder setAutocommit(boolean autocommit);

/**
* An override for the TopicPath used by this consumer.
*
* <p>When this is set, the topic path of the subscription will not be fetched: instead, the
* topic used in methods will be compared with the provided TopicPath object.
*
* <p>This is useful if you do not have the pubsublite.subscriptions.get permission for the
* subscription.
*/
public abstract Builder setTopicPathOverride(TopicPath topicPath);

public abstract ConsumerSettings build();
}

public Consumer<byte[], byte[]> instantiate() throws ApiException {
CloudRegion region = subscriptionPath().location().extractRegion();
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
try {
CloudRegion region = subscriptionPath().location().extractRegion();
TopicPath topic;
if (topicPathOverride().isPresent()) {
topic = topicPathOverride().get();
} else {
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
topic = TopicPath.parse(subscription.getTopic());
}
}
AssignerFactory assignerFactory =
receiver -> {
try {
Expand Down

0 comments on commit 7106862

Please sign in to comment.