New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add CredentialsProvider to Publisher and Subscriber settings #475
Changes from 8 commits
038cda4
6dd21e0
b6fda04
056cb28
cbd2673
45ae1fb
0f7be6c
8144b32
997a352
c1b304c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,21 +16,32 @@ | |
|
||
package com.google.cloud.pubsublite.cloudpubsub; | ||
|
||
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; | ||
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; | ||
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; | ||
|
||
import com.google.api.gax.batching.BatchingSettings; | ||
import com.google.api.gax.core.CredentialsProvider; | ||
import com.google.api.gax.rpc.ApiException; | ||
import com.google.auto.value.AutoValue; | ||
import com.google.cloud.pubsublite.AdminClient; | ||
import com.google.cloud.pubsublite.AdminClientSettings; | ||
import com.google.cloud.pubsublite.Constants; | ||
import com.google.cloud.pubsublite.Message; | ||
import com.google.cloud.pubsublite.MessageTransformer; | ||
import com.google.cloud.pubsublite.Partition; | ||
import com.google.cloud.pubsublite.TopicPath; | ||
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher; | ||
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher; | ||
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher; | ||
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; | ||
import com.google.cloud.pubsublite.internal.wire.PubsubContext; | ||
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; | ||
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; | ||
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; | ||
import com.google.cloud.pubsublite.v1.AdminServiceClient; | ||
import com.google.cloud.pubsublite.v1.AdminServiceSettings; | ||
import com.google.cloud.pubsublite.v1.PublisherServiceClient; | ||
import com.google.cloud.pubsublite.v1.PublisherServiceSettings; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.pubsub.v1.PubsubMessage; | ||
import java.util.Optional; | ||
import java.util.function.Supplier; | ||
|
@@ -65,17 +76,26 @@ public abstract class PublisherSettings { | |
/** Batching settings for this publisher to use. Apply per-partition. */ | ||
abstract Optional<BatchingSettings> batchingSettings(); | ||
|
||
/** A supplier for new PublisherServiceClients. Should return a new client each time. */ | ||
/** A provider for credentials. */ | ||
abstract CredentialsProvider credentialsProvider(); | ||
|
||
/** | ||
* A supplier for new PublisherServiceClients. Should return a new client each time. If present, | ||
* ignores CredentialsProvider. | ||
*/ | ||
abstract Optional<Supplier<PublisherServiceClient>> serviceClientSupplier(); | ||
|
||
/** The AdminClient to use, if provided. */ | ||
abstract Optional<AdminClient> adminClient(); | ||
|
||
// For testing. | ||
abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder(); | ||
|
||
abstract Optional<PartitionCountWatcher.Factory> partitionCountWatcherFactory(); | ||
|
||
/** Get a new builder for a PublisherSettings. */ | ||
public static Builder newBuilder() { | ||
return new AutoValue_PublisherSettings.Builder() | ||
.setCredentialsProvider( | ||
PublisherServiceSettings.defaultCredentialsProviderBuilder().build()) | ||
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder()); | ||
} | ||
|
||
|
@@ -97,18 +117,60 @@ public abstract Builder setMessageTransformer( | |
/** Batching settings for this publisher to use. Apply per-partition. */ | ||
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings); | ||
|
||
/** A supplier for new PublisherServiceClients. Should return a new client each time. */ | ||
/** A provider for credentials. */ | ||
public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider); | ||
|
||
/** | ||
* A supplier for new PublisherServiceClients. Should return a new client each time. If present, | ||
* ignores CredentialsProvider. | ||
*/ | ||
public abstract Builder setServiceClientSupplier(Supplier<PublisherServiceClient> supplier); | ||
|
||
/** The AdminClient to use, if provided. */ | ||
public abstract Builder setAdminClient(AdminClient adminClient); | ||
|
||
// For testing. | ||
@VisibleForTesting | ||
abstract Builder setUnderlyingBuilder( | ||
SinglePartitionPublisherBuilder.Builder underlyingBuilder); | ||
|
||
abstract Builder setPartitionCountWatcherFactory(PartitionCountWatcher.Factory factory); | ||
|
||
public abstract PublisherSettings build(); | ||
} | ||
|
||
private PublisherServiceClient newServiceClient(Partition partition) throws ApiException { | ||
if (serviceClientSupplier().isPresent()) return serviceClientSupplier().get().get(); | ||
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); | ||
settingsBuilder = settingsBuilder.setCredentialsProvider(credentialsProvider()); | ||
settingsBuilder = | ||
addDefaultMetadata( | ||
PubsubContext.of(FRAMEWORK), | ||
RoutingMetadata.of(topicPath(), partition), | ||
settingsBuilder); | ||
try { | ||
return PublisherServiceClient.create( | ||
addDefaultSettings(topicPath().location().region(), settingsBuilder)); | ||
} catch (Throwable t) { | ||
throw toCanonical(t).underlying; | ||
} | ||
} | ||
|
||
private AdminClient getAdminClient() throws ApiException { | ||
if (adminClient().isPresent()) return adminClient().get(); | ||
try { | ||
return AdminClient.create( | ||
AdminClientSettings.newBuilder() | ||
.setServiceClient( | ||
AdminServiceClient.create( | ||
AdminServiceSettings.newBuilder() | ||
.setCredentialsProvider(credentialsProvider()) | ||
.build())) | ||
.setRegion(topicPath().location().region()) | ||
.build()); | ||
} catch (Throwable t) { | ||
throw toCanonical(t).underlying; | ||
} | ||
} | ||
|
||
@SuppressWarnings("CheckReturnValue") | ||
Publisher instantiate() throws ApiException { | ||
BatchingSettings batchingSettings = batchingSettings().orElse(DEFAULT_BATCHING_SETTINGS); | ||
|
@@ -125,16 +187,12 @@ Publisher instantiate() throws ApiException { | |
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder = | ||
underlyingBuilder() | ||
.setBatchingSettings(batchingSettings) | ||
.setContext(PubsubContext.of(FRAMEWORK)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Beam also touches this removed setContext method. Should it just remove the usage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has been moved to newServiceClient method. (Thanks @dpcollins-google !) |
||
.setTopic(topicPath()) | ||
.setPartition(partition); | ||
serviceClientSupplier() | ||
.ifPresent( | ||
supplier -> singlePartitionBuilder.setServiceClient(supplier.get())); | ||
.setPartition(partition) | ||
.setServiceClient(newServiceClient(partition)); | ||
return singlePartitionBuilder.build(); | ||
}); | ||
partitionCountWatcherFactory().ifPresent(publisherSettings::setConfigWatcherFactory); | ||
return new WrappingPublisher( | ||
new PartitionCountWatchingPublisher(publisherSettings.build()), messageTransformer); | ||
}) | ||
.setAdminClient(getAdminClient()); | ||
return new WrappingPublisher(publisherSettings.build().instantiate(), messageTransformer); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you instead add a specific exclusion for what you are adding? Not all changes to Builders will be adding abstract methods. Will this trigger on removing methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This specifically triggers on adding methods to nested classes named "Builder". I think thats specific enough. (code 7013 is added abstract method to abstract class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this isn't specific enough and would require a major version bump if this API had reached 1.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree. Addition of a method to a Builder class, which is only overridden in an autogenerated AutoValue file, would not trigger a major version bump as it is not a backwards-incompatible API surface change.