Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove custom thread pools #365

Merged
merged 2 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
Expand All @@ -45,6 +44,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Converts gRPC/Cloud Spanner client library asyncronous abstractions into reactive ones.
Expand All @@ -54,6 +54,9 @@ class DatabaseClientReactiveAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseClientReactiveAdapter.class);

public static final Executor REACTOR_EXECUTOR =
runnable -> Schedulers.parallel().schedule(runnable);

// used for DDL operations
private final SpannerConnectionConfiguration config;

Expand All @@ -63,12 +66,12 @@ class DatabaseClientReactiveAdapter {

private final DatabaseAdminClient dbAdminClient;

private final ExecutorService executorService;

private DatabaseClientTransactionManager txnManager;

private boolean autoCommit = true;

private boolean active = true;

private QueryOptions queryOptions;

/**
Expand All @@ -84,9 +87,8 @@ class DatabaseClientReactiveAdapter {
this.dbClient = spannerClient.getDatabaseClient(
DatabaseId.of(config.getProjectId(), config.getInstanceName(), config.getDatabaseName()));
this.dbAdminClient = spannerClient.getDatabaseAdminClient();
this.executorService = Executors.newFixedThreadPool(config.getThreadPoolSize());
this.config = config;
this.txnManager = new DatabaseClientTransactionManager(this.dbClient, this.executorService);
this.txnManager = new DatabaseClientTransactionManager(this.dbClient);

QueryOptions.Builder builder = QueryOptions.newBuilder();
if (config.getOptimizerVersion() != null) {
Expand Down Expand Up @@ -148,12 +150,14 @@ Publisher<Void> rollback() {
* @return reactive pipeline for closing the connection.
*/
Mono<Void> close() {
// TODO: if txn is committed/rolled back and then connection closed, clearTransactionManager
// will run twice, causing trace span to be closed twice. Introduce `closed` field.
return Mono.fromRunnable(() -> {
this.txnManager.clearTransactionManager();
this.executorService.shutdown();
return Mono.defer(() -> {
if (!this.active) {
return Mono.empty();
}
this.active = false;
return convertFutureToMono(() -> this.txnManager.clearTransactionManager());
});

}

/**
Expand All @@ -163,7 +167,7 @@ Mono<Void> close() {
*/
Mono<Boolean> healthCheck() {
return Mono.defer(() -> {
if (this.executorService.isShutdown() || this.spannerClient.isClosed()) {
if (!this.active || this.spannerClient.isClosed()) {
return Mono.just(false);
} else {
return Flux.<SpannerClientLibraryRow>create(sink -> {
Expand All @@ -181,7 +185,7 @@ Mono<Boolean> healthCheck() {
}

Mono<Boolean> localHealthcheck() {
return Mono.fromSupplier(() -> !this.executorService.isShutdown());
return Mono.fromSupplier(() -> this.active);
}

boolean isAutoCommit() {
Expand Down Expand Up @@ -239,7 +243,7 @@ private <T> Mono<T> runBatchDmlInternal(
ApiFuture<T> rowCountFuture =
this.dbClient
.runAsync()
.runAsync(txn -> asyncOperation.apply(txn), this.executorService);
.runAsync(txn -> asyncOperation.apply(txn), REACTOR_EXECUTOR);
return rowCountFuture;
}
});
Expand Down Expand Up @@ -287,9 +291,7 @@ private ApiFuture<Void> runSelectStatementAsFlux(
sink.onCancel(ars::cancel);
sink.onDispose(ars::close);

return ars.setCallback(
this.executorService,
new ResultSetReadyCallback(sink));
return ars.setCallback(REACTOR_EXECUTOR, new ResultSetReadyCallback(sink));
}

static class ResultSetReadyCallback implements ReadyCallback {
Expand Down Expand Up @@ -339,7 +341,7 @@ public void onSuccess(T result) {
sink.success(result);
}
},
this.executorService);
REACTOR_EXECUTOR);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.DatabaseClientReactiveAdapter.REACTOR_EXECUTOR;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
Expand All @@ -28,7 +30,6 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.r2dbc.TransactionInProgressException;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,12 +54,8 @@ class DatabaseClientTransactionManager {

private AsyncTransactionStep<?, ? extends Object> lastStep;

private final ExecutorService executorService;

public DatabaseClientTransactionManager(
DatabaseClient dbClient, ExecutorService executorService) {
public DatabaseClientTransactionManager(DatabaseClient dbClient) {
this.dbClient = dbClient;
this.executorService = executorService;
}

boolean isInReadWriteTransaction() {
Expand Down Expand Up @@ -105,13 +102,16 @@ void beginReadonlyTransaction(TimestampBound timestampBound) {
/**
* Closes the read/write transaction manager and clears its state.
*/
void clearTransactionManager() {
ApiFuture<Void> clearTransactionManager() {
this.txnContextFuture = null;
this.lastStep = null;
ApiFuture<Void> returnFuture = ApiFutures.immediateFuture(null);

if (this.transactionManager != null) {
this.transactionManager.close();
returnFuture = this.transactionManager.closeAsync();
this.transactionManager = null;
}
return returnFuture;
}

/**
Expand Down Expand Up @@ -174,9 +174,9 @@ <T> ApiFuture<T> runInTransaction(Function<? super TransactionContext, ApiFuture
AsyncTransactionStep<? extends Object, T> updateStatementFuture =
this.lastStep == null
? this.txnContextFuture.then(
(ctx, unusedVoid) -> operation.apply(ctx), this.executorService)
(ctx, unusedVoid) -> operation.apply(ctx), REACTOR_EXECUTOR)
: this.lastStep.then(
(ctx, unusedPreviousResult) -> operation.apply(ctx), this.executorService);
(ctx, unusedPreviousResult) -> operation.apply(ctx), REACTOR_EXECUTOR);

this.lastStep = updateStatementFuture;
return updateStatementFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.cloud.spanner.r2dbc.TransactionInProgressException;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -72,9 +71,7 @@ public void setUp() {
when(this.mockDbClient.readOnlyTransaction(TimestampBound.strong()))
.thenReturn(this.mockReadOnlyTransaction);

this.transactionManager =
new DatabaseClientTransactionManager(
this.mockDbClient, Executors.newSingleThreadExecutor());
this.transactionManager = new DatabaseClientTransactionManager(this.mockDbClient);

}

Expand Down