diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index a58d93720a..e4b115de9b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -471,13 +471,17 @@ public ServiceRpc create(SpannerOptions options) { private static final AtomicInteger DEFAULT_POOL_COUNT = new AtomicInteger(); /** {@link ExecutorProvider} that is used for {@link AsyncResultSet}. */ - interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable { + public interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable { /** Overridden to suppress the throws declaration of the super interface. */ @Override void close(); } - static class FixedCloseableExecutorProvider implements CloseableExecutorProvider { + /** + * Implementation of {@link CloseableExecutorProvider} that uses a fixed single {@link + * ScheduledExecutorService}. + */ + public static class FixedCloseableExecutorProvider implements CloseableExecutorProvider { private final ScheduledExecutorService executor; private FixedCloseableExecutorProvider(ScheduledExecutorService executor) { @@ -500,7 +504,7 @@ public boolean shouldAutoClose() { } /** Creates a FixedCloseableExecutorProvider. */ - static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) { + public static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) { return new FixedCloseableExecutorProvider(executor); } } @@ -516,8 +520,19 @@ static CloseableExecutorProvider createDefaultAsyncExecutorProvider() { return createAsyncExecutorProvider(8, 60L, TimeUnit.SECONDS); } - @VisibleForTesting - static CloseableExecutorProvider createAsyncExecutorProvider( + /** + * Creates a {@link CloseableExecutorProvider} that can be used as an {@link ExecutorProvider} for + * the async API. The {@link ExecutorProvider} will lazily create up to poolSize threads. The + * backing threads will automatically be shutdown if they have not been used during the keep-alive + * time. The backing threads are created as daemon threads. + * + * @param poolSize the maximum number of threads to create in the pool + * @param keepAliveTime the time that an unused thread in the pool should be kept alive + * @param unit the time unit used for the keepAliveTime + * @return a {@link CloseableExecutorProvider} that can be used for {@link + * SpannerOptions.Builder#setAsyncExecutorProvider(CloseableExecutorProvider)} + */ + public static CloseableExecutorProvider createAsyncExecutorProvider( int poolSize, long keepAliveTime, TimeUnit unit) { String format = String.format("spanner-async-pool-%d-thread-%%d", DEFAULT_POOL_COUNT.incrementAndGet()); @@ -1018,6 +1033,26 @@ public Builder setCompressorName(@Nullable String compressorName) { return this; } + /** + * Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor, + * such as fetching results for an {@link AsyncResultSet}. + * + *

Async methods will use a sensible default if no custom {@link ExecutorProvider} has been + * set. The default {@link ExecutorProvider} uses a cached thread pool containing a maximum of 8 + * threads. The pool is lazily initialized and will not create any threads if the user + * application does not use any async methods. It will also scale down the thread usage if the + * async load allows for that. + * + *

Call {@link SpannerOptions#createAsyncExecutorProvider(int, long, TimeUnit)} to create a + * provider with a custom pool size or call {@link + * FixedCloseableExecutorProvider#create(ScheduledExecutorService)} to create a {@link + * CloseableExecutorProvider} from a standard Java {@link ScheduledExecutorService}. + */ + public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) { + this.asyncExecutorProvider = provider; + return this; + } + /** * Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code * PartialResultSet} chunks for each read and query. The data size of each chunk depends on the @@ -1198,7 +1233,7 @@ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) { return options; } - CloseableExecutorProvider getAsyncExecutorProvider() { + public CloseableExecutorProvider getAsyncExecutorProvider() { return asyncExecutorProvider; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 3698d3379f..fc82954ad4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -19,7 +19,9 @@ import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.RetrySettings; @@ -29,6 +31,7 @@ import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.TransportOptions; +import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider; import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings; @@ -54,6 +57,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nonnull; import org.junit.Test; import org.junit.runner.RunWith; @@ -892,4 +896,16 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { .getTimeout()) .isEqualTo(Duration.ofSeconds(6L)); } + + @Test + public void testCustomAsyncExecutorProvider() { + ScheduledExecutorService service = mock(ScheduledExecutorService.class); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .setAsyncExecutorProvider(FixedCloseableExecutorProvider.create(service)) + .build(); + assertSame(service, options.getAsyncExecutorProvider().getExecutor()); + } }