Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support setting an async executor provider #1263

Merged
merged 2 commits into from Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -471,13 +471,17 @@ public ServiceRpc create(SpannerOptions options) {
private static final AtomicInteger DEFAULT_POOL_COUNT = new AtomicInteger();

/** {@link ExecutorProvider} that is used for {@link AsyncResultSet}. */
interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {
public interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {
/** Overridden to suppress the throws declaration of the super interface. */
@Override
void close();
}

static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {
/**
* Implementation of {@link CloseableExecutorProvider} that uses a fixed single {@link
* ScheduledExecutorService}.
*/
public static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {
private final ScheduledExecutorService executor;

private FixedCloseableExecutorProvider(ScheduledExecutorService executor) {
Expand All @@ -500,7 +504,7 @@ public boolean shouldAutoClose() {
}

/** Creates a FixedCloseableExecutorProvider. */
static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) {
public static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) {
return new FixedCloseableExecutorProvider(executor);
}
}
Expand All @@ -516,8 +520,19 @@ static CloseableExecutorProvider createDefaultAsyncExecutorProvider() {
return createAsyncExecutorProvider(8, 60L, TimeUnit.SECONDS);
}

@VisibleForTesting
static CloseableExecutorProvider createAsyncExecutorProvider(
/**
* Creates a {@link CloseableExecutorProvider} that can be used as an {@link ExecutorProvider} for
* the async API. The {@link ExecutorProvider} will lazily create up to poolSize threads. The
* backing threads will automatically be shutdown if they have not been used during the keep-alive
* time. The backing threads are created as daemon threads.
*
* @param poolSize the maximum number of threads to create in the pool
* @param keepAliveTime the time that an unused thread in the pool should be kept alive
* @param unit the time unit used for the keepAliveTime
* @return a {@link CloseableExecutorProvider} that can be used for {@link
* SpannerOptions.Builder#setAsyncExecutorProvider(CloseableExecutorProvider)}
*/
public static CloseableExecutorProvider createAsyncExecutorProvider(
int poolSize, long keepAliveTime, TimeUnit unit) {
String format =
String.format("spanner-async-pool-%d-thread-%%d", DEFAULT_POOL_COUNT.incrementAndGet());
Expand Down Expand Up @@ -1018,6 +1033,26 @@ public Builder setCompressorName(@Nullable String compressorName) {
return this;
}

/**
* Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor,
* such as fetching results for an {@link AsyncResultSet}.
*
* <p>Async methods will use a sensible default if no custom {@link ExecutorProvider} has been
* set. The default {@link ExecutorProvider} uses a cached thread pool containing a maximum of 8
* threads. The pool is lazily initialized and will not create any threads if the user
* application does not use any async methods. It will also scale down the thread usage if the
* async load allows for that.
*
* <p>Call {@link SpannerOptions#createAsyncExecutorProvider(int, long, TimeUnit)} to create a
* provider with a custom pool size or call {@link
* FixedCloseableExecutorProvider#create(ScheduledExecutorService)} to create a {@link
* CloseableExecutorProvider} from a standard Java {@link ScheduledExecutorService}.
*/
public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) {
this.asyncExecutorProvider = provider;
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -1198,7 +1233,7 @@ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
return options;
}

CloseableExecutorProvider getAsyncExecutorProvider() {
public CloseableExecutorProvider getAsyncExecutorProvider() {
return asyncExecutorProvider;
}

Expand Down
Expand Up @@ -19,7 +19,9 @@
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
Expand All @@ -29,6 +31,7 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
Expand All @@ -54,6 +57,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -892,4 +896,16 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() {
.getTimeout())
.isEqualTo(Duration.ofSeconds(6L));
}

@Test
public void testCustomAsyncExecutorProvider() {
ScheduledExecutorService service = mock(ScheduledExecutorService.class);
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.setAsyncExecutorProvider(FixedCloseableExecutorProvider.create(service))
.build();
assertSame(service, options.getAsyncExecutorProvider().getExecutor());
}
}