Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add setTopicPathOverride to consumer settings #174

Merged
merged 2 commits into from Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions clirr-ignored-differences.xml
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
<className>**/*$Builder</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -47,6 +47,7 @@
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;

@AutoValue
Expand All @@ -61,29 +62,55 @@ public abstract class ConsumerSettings {
// Optional parameters.
abstract boolean autocommit();

abstract Optional<TopicPath> topicPathOverride();

public static Builder newBuilder() {
return (new AutoValue_ConsumerSettings.Builder()).setAutocommit(false);
}

@AutoValue.Builder
public abstract static class Builder {
// Required parameters.
/**
* The subscription path to use. Only the topic corresponding to this subscription can be
* subscribed to.
*/
public abstract Builder setSubscriptionPath(SubscriptionPath path);

/** The per-partition flow control settings. */
public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings);

// Optional parameters.
/** The autocommit mode. */
public abstract Builder setAutocommit(boolean autocommit);

/**
* An override for the TopicPath used by this consumer.
*
* <p>When this is set, the topic path of the subscription will not be fetched: instead, the
* topic used in methods will be compared with the provided TopicPath object.
*
* <p>This is useful if you do not have the pubsublite.subscriptions.get permission for the
* subscription.
*/
public abstract Builder setTopicPathOverride(TopicPath topicPath);

public abstract ConsumerSettings build();
}

public Consumer<byte[], byte[]> instantiate() throws ApiException {
CloudRegion region = subscriptionPath().location().extractRegion();
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
try {
CloudRegion region = subscriptionPath().location().extractRegion();
TopicPath topic;
if (topicPathOverride().isPresent()) {
topic = topicPathOverride().get();
} else {
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
topic = TopicPath.parse(subscription.getTopic());
}
}
AssignerFactory assignerFactory =
receiver -> {
try {
Expand Down