Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove custom thread pools #365

Merged
merged 2 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
Expand All @@ -45,6 +44,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Converts gRPC/Cloud Spanner client library asyncronous abstractions into reactive ones.
Expand All @@ -54,6 +54,9 @@ class DatabaseClientReactiveAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseClientReactiveAdapter.class);

public static final Executor REACTOR_EXECUTOR =
runnable -> Schedulers.parallel().schedule(runnable);

// used for DDL operations
private final SpannerConnectionConfiguration config;

Expand All @@ -63,12 +66,12 @@ class DatabaseClientReactiveAdapter {

private final DatabaseAdminClient dbAdminClient;

private final ExecutorService executorService;

private DatabaseClientTransactionManager txnManager;

private boolean autoCommit = true;

private boolean active = true;

private QueryOptions queryOptions;

/**
Expand All @@ -84,9 +87,8 @@ class DatabaseClientReactiveAdapter {
this.dbClient = spannerClient.getDatabaseClient(
DatabaseId.of(config.getProjectId(), config.getInstanceName(), config.getDatabaseName()));
this.dbAdminClient = spannerClient.getDatabaseAdminClient();
this.executorService = Executors.newFixedThreadPool(config.getThreadPoolSize());
this.config = config;
this.txnManager = new DatabaseClientTransactionManager(this.dbClient, this.executorService);
this.txnManager = new DatabaseClientTransactionManager(this.dbClient);

QueryOptions.Builder builder = QueryOptions.newBuilder();
if (config.getOptimizerVersion() != null) {
Expand Down Expand Up @@ -148,12 +150,14 @@ Publisher<Void> rollback() {
* @return reactive pipeline for closing the connection.
*/
Mono<Void> close() {
// TODO: if txn is committed/rolled back and then connection closed, clearTransactionManager
// will run twice, causing trace span to be closed twice. Introduce `closed` field.
return Mono.fromRunnable(() -> {
this.txnManager.clearTransactionManager();
this.executorService.shutdown();
return Mono.defer(() -> {
if (!this.active) {
return Mono.empty();
}
this.active = false;
return convertFutureToMono(() -> this.txnManager.clearTransactionManager());
});

}

/**
Expand All @@ -163,25 +167,23 @@ Mono<Void> close() {
*/
Mono<Boolean> healthCheck() {
return Mono.defer(() -> {
if (this.executorService.isShutdown() || this.spannerClient.isClosed()) {
if (!this.active || this.spannerClient.isClosed()) {
return Mono.just(false);
} else {
return Flux.<SpannerClientLibraryRow>create(sink -> {
com.google.cloud.spanner.Statement statement =
Statement.newBuilder("SELECT 1").build();
Statement statement = Statement.newBuilder("SELECT 1").build();
runSelectStatementAsFlux(this.dbClient.singleUse(), statement, sink);
})
.then(Mono.just(true))
.onErrorResume(error -> {
LOGGER.warn("Cloud Spanner healthcheck failed", error);
return Mono.just(false);
});
}).then(Mono.just(true))
.onErrorResume(error -> {
LOGGER.warn("Cloud Spanner healthcheck failed", error);
return Mono.just(false);
});
}
});
}

Mono<Boolean> localHealthcheck() {
return Mono.fromSupplier(() -> !this.executorService.isShutdown());
return Mono.fromSupplier(() -> this.active);
}

boolean isAutoCommit() {
Expand All @@ -206,7 +208,7 @@ Publisher<Void> setAutoCommit(boolean autoCommit) {
*
* @return reactive pipeline for running a DML statement
*/
Mono<Long> runDmlStatement(com.google.cloud.spanner.Statement statement) {
Mono<Long> runDmlStatement(Statement statement) {
return runBatchDmlInternal(ctx -> ctx.executeUpdateAsync(statement));
}

Expand Down Expand Up @@ -239,15 +241,14 @@ private <T> Mono<T> runBatchDmlInternal(
ApiFuture<T> rowCountFuture =
this.dbClient
.runAsync()
.runAsync(txn -> asyncOperation.apply(txn), this.executorService);
.runAsync(asyncOperation::apply, REACTOR_EXECUTOR);
return rowCountFuture;
}
});
});
}

Flux<SpannerClientLibraryRow> runSelectStatement(
com.google.cloud.spanner.Statement statement) {
Flux<SpannerClientLibraryRow> runSelectStatement(Statement statement) {
return Flux.create(
sink -> {
if (this.txnManager.isInReadWriteTransaction()) {
Expand Down Expand Up @@ -280,16 +281,12 @@ Mono<Void> runDdlStatement(String query) {
* @return future suitable for transactional step chaining
*/
private ApiFuture<Void> runSelectStatementAsFlux(
ReadContext readContext,
com.google.cloud.spanner.Statement statement,
FluxSink<SpannerClientLibraryRow> sink) {
ReadContext readContext, Statement statement, FluxSink<SpannerClientLibraryRow> sink) {
AsyncResultSet ars = readContext.executeQueryAsync(statement);
sink.onCancel(ars::cancel);
sink.onDispose(ars::close);

return ars.setCallback(
this.executorService,
new ResultSetReadyCallback(sink));
return ars.setCallback(REACTOR_EXECUTOR, new ResultSetReadyCallback(sink));
}

static class ResultSetReadyCallback implements ReadyCallback {
Expand Down Expand Up @@ -339,7 +336,7 @@ public void onSuccess(T result) {
sink.success(result);
}
},
this.executorService);
REACTOR_EXECUTOR);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.DatabaseClientReactiveAdapter.REACTOR_EXECUTOR;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
Expand All @@ -28,7 +30,6 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.r2dbc.TransactionInProgressException;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,12 +54,8 @@ class DatabaseClientTransactionManager {

private AsyncTransactionStep<?, ? extends Object> lastStep;

private final ExecutorService executorService;

public DatabaseClientTransactionManager(
DatabaseClient dbClient, ExecutorService executorService) {
public DatabaseClientTransactionManager(DatabaseClient dbClient) {
this.dbClient = dbClient;
this.executorService = executorService;
}

boolean isInReadWriteTransaction() {
Expand Down Expand Up @@ -105,13 +102,21 @@ void beginReadonlyTransaction(TimestampBound timestampBound) {
/**
* Closes the read/write transaction manager and clears its state.
*/
void clearTransactionManager() {
ApiFuture<Void> clearTransactionManager() {
this.txnContextFuture = null;
this.lastStep = null;
ApiFuture<Void> returnFuture = ApiFutures.immediateFuture(null);

if (this.transactionManager != null) {
this.transactionManager.close();
returnFuture = this.transactionManager.closeAsync();
this.transactionManager = null;
}

if (isInReadonlyTransaction()) {
closeReadOnlyTransaction();
}

return returnFuture;
}

/**
Expand Down Expand Up @@ -174,9 +179,9 @@ <T> ApiFuture<T> runInTransaction(Function<? super TransactionContext, ApiFuture
AsyncTransactionStep<? extends Object, T> updateStatementFuture =
this.lastStep == null
? this.txnContextFuture.then(
(ctx, unusedVoid) -> operation.apply(ctx), this.executorService)
(ctx, unusedVoid) -> operation.apply(ctx), REACTOR_EXECUTOR)
: this.lastStep.then(
(ctx, unusedPreviousResult) -> operation.apply(ctx), this.executorService);
(ctx, unusedPreviousResult) -> operation.apply(ctx), REACTOR_EXECUTOR);

this.lastStep = updateStatementFuture;
return updateStatementFuture;
Expand Down