Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add CredentialsProvider to Publisher and Subscriber settings #475

Merged
merged 10 commits into from Feb 1, 2021
15 changes: 7 additions & 8 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
@@ -1,15 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Removed not needed API -->
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you instead add a specific exclusion for what you are adding? Not all changes to Builders will be adding abstract methods. Will this trigger on removing methods?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specifically triggers on adding methods to nested classes named "Builder". I think thats specific enough. (code 7013 is added abstract method to abstract class)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this isn't specific enough and would require a major version bump if this API had reached 1.0.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. Addition of a method to a Builder class, which is only overridden in an autogenerated AutoValue file, would not trigger a major version bump as it is not a backwards-incompatible API surface change.

<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
</difference>
<difference>
<differenceType>6011</differenceType>
<className>com/google/cloud/pubsublite/Constants</className>
<field>MAX_PUBLISH_MESSAGE_BYTES</field>
<differenceType>7013</differenceType>
<className>**/*$Builder</className>
<method>*</method>
</difference>
<!-- Blanket ignored files -->
<difference>
Expand All @@ -30,6 +26,7 @@
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
Expand Down Expand Up @@ -113,6 +110,7 @@
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
Expand Down Expand Up @@ -173,6 +171,7 @@
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
Expand Down
8 changes: 8 additions & 0 deletions google-cloud-pubsublite/pom.xml
Expand Up @@ -93,6 +93,14 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>

<!-- Add system backend as a default dependency. This can be excluded if a different backend is needed.-->
<dependency>
Expand Down
Expand Up @@ -16,21 +16,32 @@

package com.google.cloud.pubsublite.cloudpubsub;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -65,17 +76,26 @@ public abstract class PublisherSettings {
/** Batching settings for this publisher to use. Apply per-partition. */
abstract Optional<BatchingSettings> batchingSettings();

/** A supplier for new PublisherServiceClients. Should return a new client each time. */
/** A provider for credentials. */
abstract CredentialsProvider credentialsProvider();

/**
* A supplier for new PublisherServiceClients. Should return a new client each time. If present,
* ignores CredentialsProvider.
*/
abstract Optional<Supplier<PublisherServiceClient>> serviceClientSupplier();

/** The AdminClient to use, if provided. */
abstract Optional<AdminClient> adminClient();

// For testing.
abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder();

abstract Optional<PartitionCountWatcher.Factory> partitionCountWatcherFactory();

/** Get a new builder for a PublisherSettings. */
public static Builder newBuilder() {
return new AutoValue_PublisherSettings.Builder()
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder().build())
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
}

Expand All @@ -97,18 +117,60 @@ public abstract Builder setMessageTransformer(
/** Batching settings for this publisher to use. Apply per-partition. */
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);

/** A supplier for new PublisherServiceClients. Should return a new client each time. */
/** A provider for credentials. */
public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider);

/**
* A supplier for new PublisherServiceClients. Should return a new client each time. If present,
* ignores CredentialsProvider.
*/
public abstract Builder setServiceClientSupplier(Supplier<PublisherServiceClient> supplier);

/** The AdminClient to use, if provided. */
public abstract Builder setAdminClient(AdminClient adminClient);

// For testing.
@VisibleForTesting
abstract Builder setUnderlyingBuilder(
SinglePartitionPublisherBuilder.Builder underlyingBuilder);

abstract Builder setPartitionCountWatcherFactory(PartitionCountWatcher.Factory factory);

public abstract PublisherSettings build();
}

private PublisherServiceClient newServiceClient(Partition partition) throws ApiException {
if (serviceClientSupplier().isPresent()) return serviceClientSupplier().get().get();
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
settingsBuilder = settingsBuilder.setCredentialsProvider(credentialsProvider());
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
settingsBuilder);
try {
return PublisherServiceClient.create(
addDefaultSettings(topicPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private AdminClient getAdminClient() throws ApiException {
if (adminClient().isPresent()) return adminClient().get();
try {
return AdminClient.create(
AdminClientSettings.newBuilder()
.setServiceClient(
AdminServiceClient.create(
AdminServiceSettings.newBuilder()
.setCredentialsProvider(credentialsProvider())
.build()))
.setRegion(topicPath().location().region())
.build());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

@SuppressWarnings("CheckReturnValue")
Publisher instantiate() throws ApiException {
BatchingSettings batchingSettings = batchingSettings().orElse(DEFAULT_BATCHING_SETTINGS);
Expand All @@ -125,16 +187,12 @@ Publisher instantiate() throws ApiException {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
underlyingBuilder()
.setBatchingSettings(batchingSettings)
.setContext(PubsubContext.of(FRAMEWORK))
Copy link
Member

@suztomo suztomo Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved to newServiceClient method. (Thanks @dpcollins-google !)

.setTopic(topicPath())
.setPartition(partition);
serviceClientSupplier()
.ifPresent(
supplier -> singlePartitionBuilder.setServiceClient(supplier.get()));
.setPartition(partition)
.setServiceClient(newServiceClient(partition));
return singlePartitionBuilder.build();
});
partitionCountWatcherFactory().ifPresent(publisherSettings::setConfigWatcherFactory);
return new WrappingPublisher(
new PartitionCountWatchingPublisher(publisherSettings.build()), messageTransformer);
})
.setAdminClient(getAdminClient());
return new WrappingPublisher(publisherSettings.build().instantiate(), messageTransformer);
}
}