Skip to content

Commit

Permalink
Remove custom thread pools (#365)
Browse files Browse the repository at this point in the history
* remove custom thread pools; switch to async version of closing transaction manager

* added tests, closed readonly transaction
  • Loading branch information
elefeint committed May 11, 2021
1 parent bc45b2a commit 7634191
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
Expand All @@ -45,6 +44,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Converts gRPC/Cloud Spanner client library asyncronous abstractions into reactive ones.
Expand All @@ -54,6 +54,9 @@ class DatabaseClientReactiveAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseClientReactiveAdapter.class);

public static final Executor REACTOR_EXECUTOR =
runnable -> Schedulers.parallel().schedule(runnable);

// used for DDL operations
private final SpannerConnectionConfiguration config;

Expand All @@ -63,12 +66,12 @@ class DatabaseClientReactiveAdapter {

private final DatabaseAdminClient dbAdminClient;

private final ExecutorService executorService;

private DatabaseClientTransactionManager txnManager;

private boolean autoCommit = true;

private boolean active = true;

private QueryOptions queryOptions;

/**
Expand All @@ -84,9 +87,8 @@ class DatabaseClientReactiveAdapter {
this.dbClient = spannerClient.getDatabaseClient(
DatabaseId.of(config.getProjectId(), config.getInstanceName(), config.getDatabaseName()));
this.dbAdminClient = spannerClient.getDatabaseAdminClient();
this.executorService = Executors.newFixedThreadPool(config.getThreadPoolSize());
this.config = config;
this.txnManager = new DatabaseClientTransactionManager(this.dbClient, this.executorService);
this.txnManager = new DatabaseClientTransactionManager(this.dbClient);

QueryOptions.Builder builder = QueryOptions.newBuilder();
if (config.getOptimizerVersion() != null) {
Expand Down Expand Up @@ -148,12 +150,14 @@ Publisher<Void> rollback() {
* @return reactive pipeline for closing the connection.
*/
Mono<Void> close() {
// TODO: if txn is committed/rolled back and then connection closed, clearTransactionManager
// will run twice, causing trace span to be closed twice. Introduce `closed` field.
return Mono.fromRunnable(() -> {
this.txnManager.clearTransactionManager();
this.executorService.shutdown();
return Mono.defer(() -> {
if (!this.active) {
return Mono.empty();
}
this.active = false;
return convertFutureToMono(() -> this.txnManager.clearTransactionManager());
});

}

/**
Expand All @@ -163,25 +167,23 @@ Mono<Void> close() {
*/
Mono<Boolean> healthCheck() {
return Mono.defer(() -> {
if (this.executorService.isShutdown() || this.spannerClient.isClosed()) {
if (!this.active || this.spannerClient.isClosed()) {
return Mono.just(false);
} else {
return Flux.<SpannerClientLibraryRow>create(sink -> {
com.google.cloud.spanner.Statement statement =
Statement.newBuilder("SELECT 1").build();
Statement statement = Statement.newBuilder("SELECT 1").build();
runSelectStatementAsFlux(this.dbClient.singleUse(), statement, sink);
})
.then(Mono.just(true))
.onErrorResume(error -> {
LOGGER.warn("Cloud Spanner healthcheck failed", error);
return Mono.just(false);
});
}).then(Mono.just(true))
.onErrorResume(error -> {
LOGGER.warn("Cloud Spanner healthcheck failed", error);
return Mono.just(false);
});
}
});
}

Mono<Boolean> localHealthcheck() {
return Mono.fromSupplier(() -> !this.executorService.isShutdown());
return Mono.fromSupplier(() -> this.active);
}

boolean isAutoCommit() {
Expand All @@ -206,7 +208,7 @@ Publisher<Void> setAutoCommit(boolean autoCommit) {
*
* @return reactive pipeline for running a DML statement
*/
Mono<Long> runDmlStatement(com.google.cloud.spanner.Statement statement) {
Mono<Long> runDmlStatement(Statement statement) {
return runBatchDmlInternal(ctx -> ctx.executeUpdateAsync(statement));
}

Expand Down Expand Up @@ -239,15 +241,14 @@ private <T> Mono<T> runBatchDmlInternal(
ApiFuture<T> rowCountFuture =
this.dbClient
.runAsync()
.runAsync(txn -> asyncOperation.apply(txn), this.executorService);
.runAsync(asyncOperation::apply, REACTOR_EXECUTOR);
return rowCountFuture;
}
});
});
}

Flux<SpannerClientLibraryRow> runSelectStatement(
com.google.cloud.spanner.Statement statement) {
Flux<SpannerClientLibraryRow> runSelectStatement(Statement statement) {
return Flux.create(
sink -> {
if (this.txnManager.isInReadWriteTransaction()) {
Expand Down Expand Up @@ -280,16 +281,12 @@ Mono<Void> runDdlStatement(String query) {
* @return future suitable for transactional step chaining
*/
private ApiFuture<Void> runSelectStatementAsFlux(
ReadContext readContext,
com.google.cloud.spanner.Statement statement,
FluxSink<SpannerClientLibraryRow> sink) {
ReadContext readContext, Statement statement, FluxSink<SpannerClientLibraryRow> sink) {
AsyncResultSet ars = readContext.executeQueryAsync(statement);
sink.onCancel(ars::cancel);
sink.onDispose(ars::close);

return ars.setCallback(
this.executorService,
new ResultSetReadyCallback(sink));
return ars.setCallback(REACTOR_EXECUTOR, new ResultSetReadyCallback(sink));
}

static class ResultSetReadyCallback implements ReadyCallback {
Expand Down Expand Up @@ -339,7 +336,7 @@ public void onSuccess(T result) {
sink.success(result);
}
},
this.executorService);
REACTOR_EXECUTOR);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.DatabaseClientReactiveAdapter.REACTOR_EXECUTOR;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
Expand All @@ -28,7 +30,6 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.r2dbc.TransactionInProgressException;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,12 +54,8 @@ class DatabaseClientTransactionManager {

private AsyncTransactionStep<?, ? extends Object> lastStep;

private final ExecutorService executorService;

public DatabaseClientTransactionManager(
DatabaseClient dbClient, ExecutorService executorService) {
public DatabaseClientTransactionManager(DatabaseClient dbClient) {
this.dbClient = dbClient;
this.executorService = executorService;
}

boolean isInReadWriteTransaction() {
Expand Down Expand Up @@ -105,13 +102,21 @@ void beginReadonlyTransaction(TimestampBound timestampBound) {
/**
* Closes the read/write transaction manager and clears its state.
*/
void clearTransactionManager() {
ApiFuture<Void> clearTransactionManager() {
this.txnContextFuture = null;
this.lastStep = null;
ApiFuture<Void> returnFuture = ApiFutures.immediateFuture(null);

if (this.transactionManager != null) {
this.transactionManager.close();
returnFuture = this.transactionManager.closeAsync();
this.transactionManager = null;
}

if (isInReadonlyTransaction()) {
closeReadOnlyTransaction();
}

return returnFuture;
}

/**
Expand Down Expand Up @@ -174,9 +179,9 @@ <T> ApiFuture<T> runInTransaction(Function<? super TransactionContext, ApiFuture
AsyncTransactionStep<? extends Object, T> updateStatementFuture =
this.lastStep == null
? this.txnContextFuture.then(
(ctx, unusedVoid) -> operation.apply(ctx), this.executorService)
(ctx, unusedVoid) -> operation.apply(ctx), REACTOR_EXECUTOR)
: this.lastStep.then(
(ctx, unusedPreviousResult) -> operation.apply(ctx), this.executorService);
(ctx, unusedPreviousResult) -> operation.apply(ctx), REACTOR_EXECUTOR);

this.lastStep = updateStatementFuture;
return updateStatementFuture;
Expand Down

0 comments on commit 7634191

Please sign in to comment.