Skip to content

Commit

Permalink
feat: Add CredentialsProvider to Publisher and Subscriber settings (#475
Browse files Browse the repository at this point in the history
)

* feat: Add CredentialsProvider to Publisher and Subscriber settings

These objects require more than one client and are annoying to construct correctly. Add a commonly needed setting to the top level here.

Fixes #472

* feat: Add CredentialsProvider to Publisher and Subscriber settings

These objects require more than one client and are annoying to construct correctly. Add a commonly needed setting to the top level here.

Fixes #472

* feat: Add CredentialsProvider to Publisher and Subscriber settings

These objects require more than one client and are annoying to construct correctly. Add a commonly needed setting to the top level here.

Fixes #472

* fix: clirr

* fix: flaky test

* fix: deps

* fix: scopes

* fix: format

* fix: dependencies again

* fix: admin endpoint
  • Loading branch information
dpcollins-google committed Feb 1, 2021
1 parent cd3c41a commit ba16af8
Show file tree
Hide file tree
Showing 22 changed files with 499 additions and 434 deletions.
15 changes: 7 additions & 8 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
@@ -1,15 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Removed not needed API -->
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
</difference>
<difference>
<differenceType>6011</differenceType>
<className>com/google/cloud/pubsublite/Constants</className>
<field>MAX_PUBLISH_MESSAGE_BYTES</field>
<differenceType>7013</differenceType>
<className>**/*$Builder</className>
<method>*</method>
</difference>
<!-- Blanket ignored files -->
<difference>
Expand All @@ -30,6 +26,7 @@
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
Expand Down Expand Up @@ -113,6 +110,7 @@
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/internal/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
Expand Down Expand Up @@ -173,6 +171,7 @@
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/pubsublite/v1/**</className>
<field>*</field>
<method>*</method>
</difference>
<difference>
Expand Down
Expand Up @@ -16,21 +16,32 @@

package com.google.cloud.pubsublite.cloudpubsub;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -65,17 +76,26 @@ public abstract class PublisherSettings {
/** Batching settings for this publisher to use. Apply per-partition. */
abstract Optional<BatchingSettings> batchingSettings();

/** A supplier for new PublisherServiceClients. Should return a new client each time. */
/** A provider for credentials. */
abstract CredentialsProvider credentialsProvider();

/**
* A supplier for new PublisherServiceClients. Should return a new client each time. If present,
* ignores CredentialsProvider.
*/
abstract Optional<Supplier<PublisherServiceClient>> serviceClientSupplier();

/** The AdminClient to use, if provided. */
abstract Optional<AdminClient> adminClient();

// For testing.
abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder();

abstract Optional<PartitionCountWatcher.Factory> partitionCountWatcherFactory();

/** Get a new builder for a PublisherSettings. */
public static Builder newBuilder() {
return new AutoValue_PublisherSettings.Builder()
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder().build())
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
}

Expand All @@ -97,18 +117,61 @@ public abstract Builder setMessageTransformer(
/** Batching settings for this publisher to use. Apply per-partition. */
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);

/** A supplier for new PublisherServiceClients. Should return a new client each time. */
/** A provider for credentials. */
public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider);

/**
* A supplier for new PublisherServiceClients. Should return a new client each time. If present,
* ignores CredentialsProvider.
*/
public abstract Builder setServiceClientSupplier(Supplier<PublisherServiceClient> supplier);

/** The AdminClient to use, if provided. */
public abstract Builder setAdminClient(AdminClient adminClient);

// For testing.
@VisibleForTesting
abstract Builder setUnderlyingBuilder(
SinglePartitionPublisherBuilder.Builder underlyingBuilder);

abstract Builder setPartitionCountWatcherFactory(PartitionCountWatcher.Factory factory);

public abstract PublisherSettings build();
}

private PublisherServiceClient newServiceClient(Partition partition) throws ApiException {
if (serviceClientSupplier().isPresent()) return serviceClientSupplier().get().get();
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
settingsBuilder = settingsBuilder.setCredentialsProvider(credentialsProvider());
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
settingsBuilder);
try {
return PublisherServiceClient.create(
addDefaultSettings(topicPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private AdminClient getAdminClient() throws ApiException {
if (adminClient().isPresent()) return adminClient().get();
try {
return AdminClient.create(
AdminClientSettings.newBuilder()
.setServiceClient(
AdminServiceClient.create(
addDefaultSettings(
topicPath().location().region(),
AdminServiceSettings.newBuilder()
.setCredentialsProvider(credentialsProvider()))))
.setRegion(topicPath().location().region())
.build());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

@SuppressWarnings("CheckReturnValue")
Publisher instantiate() throws ApiException {
BatchingSettings batchingSettings = batchingSettings().orElse(DEFAULT_BATCHING_SETTINGS);
Expand All @@ -125,16 +188,12 @@ Publisher instantiate() throws ApiException {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
underlyingBuilder()
.setBatchingSettings(batchingSettings)
.setContext(PubsubContext.of(FRAMEWORK))
.setTopic(topicPath())
.setPartition(partition);
serviceClientSupplier()
.ifPresent(
supplier -> singlePartitionBuilder.setServiceClient(supplier.get()));
.setPartition(partition)
.setServiceClient(newServiceClient(partition));
return singlePartitionBuilder.build();
});
partitionCountWatcherFactory().ifPresent(publisherSettings::setConfigWatcherFactory);
return new WrappingPublisher(
new PartitionCountWatchingPublisher(publisherSettings.build()), messageTransformer);
})
.setAdminClient(getAdminClient());
return new WrappingPublisher(publisherSettings.build().instantiate(), messageTransformer);
}
}

0 comments on commit ba16af8

Please sign in to comment.