A caller will typically call {@link #tryNext()} in a loop inside the ReadyCallback,
+ * consuming all results available. For more information see {@link #setCallback(Executor,
+ * ReadyCallback)}.
+ *
+ *
Currently this method may only be called if a ReadyCallback has been registered. This is for
+ * safety purposes only, and may be relaxed in future.
+ *
+ * @return current cursor readiness state
+ * @throws SpannerException When an unrecoverable problem downstream occurs. Once this occurs you
+ * will get no further callbacks. You should return CallbackResponse.DONE back from callback.
+ */
+ CursorState tryNext() throws SpannerException;
+
+ enum CallbackResponse {
+ /**
+ * Tell the cursor to continue issuing callbacks when data is available. This is the standard
+ * "I'm ready for more" response. If cursor is not completely drained of all ready results the
+ * callback will be called again immediately.
+ */
+ CONTINUE,
+
+ /**
+ * Tell the cursor to suspend all callbacks until application calls {@link RowCursor#resume()}.
+ */
+ PAUSE,
+
+ /**
+ * Tell the cursor you are done receiving results, even if there are more results sitting in the
+ * buffer. Once you return DONE, you will receive no further callbacks.
+ *
+ *
Approximately equivalent to calling {@link RowCursor#cancel()}, and then returning {@code
+ * PAUSE}, but more clear, immediate, and idiomatic.
+ *
+ *
It is legal to commit a transaction that owns this read before actually returning {@code
+ * DONE}.
+ */
+ DONE,
+ }
+
+ /**
+ * Interface for receiving asynchronous callbacks when new data is ready. See {@link
+ * AsyncResultSet#setCallback(Executor, ReadyCallback)}.
+ */
+ interface ReadyCallback {
+ CallbackResponse cursorReady(AsyncResultSet resultSet);
+ }
+
+ /**
+ * Register a callback with the ResultSet to be made aware when more data is available, changing
+ * the usage pattern from sync to async. Details:
+ *
+ *
setCallback(Executor exec, ReadyCallback cb);
+
+ /**
+ * Attempt to cancel this operation and free all resources. Non-blocking. This is a no-op for
+ * child row cursors and does not cancel the parent cursor.
+ */
+ void cancel();
+
+ /**
+ * Resume callbacks from the cursor. If there is more data available, a callback will be
+ * dispatched immediately. This can be called from any thread.
+ */
+ void resume();
+
+ /**
+ * Transforms the row cursor into an immutable list using the given transformer function. {@code
+ * transformer} will be called once per row, thus the returned list will contain one entry per
+ * row. The returned future will throw a {@link SpannerException} if the row cursor encountered
+ * any error or if the transformer threw an exception on any row.
+ *
+ * The transformer will be run on the supplied executor. The implementation may batch multiple
+ * transformer invocations together into a single {@code Runnable} when possible to increase
+ * efficiency. At any point in time, there will be at most one invocation of the transformer in
+ * progress.
+ *
+ *
WARNING: This will result in materializing the entire list so this should be used
+ * judiciously after considering the memory requirements of the returned list.
+ *
+ *
WARNING: The {@code RowBase} object passed to transformer function is not immutable and is
+ * not guaranteed to remain valid after the transformer function returns. The same {@code RowBase}
+ * object might be passed multiple times to the transformer with different underlying data each
+ * time. So *NEVER* keep a reference to the {@code RowBase} outside of the transformer.
+ * Specifically do not use {@link com.google.common.base.Functions#identity()} function.
+ *
+ * @param transformer function which will be used to transform the row. It should not return null.
+ * @param executor executor on which the transformer will be run. This should ideally not be an
+ * inline executor such as {@code MoreExecutors.directExecutor()}; using such an executor may
+ * degrade the performance of the Spanner library.
+ */
+ ApiFuture> toListAsync(
+ Function transformer, Executor executor);
+
+ /**
+ * Transforms the row cursor into an immutable list using the given transformer function. {@code
+ * transformer} will be called once per row, thus the returned list will contain one entry per
+ * row. This method will block until all the rows have been yielded by the cursor.
+ *
+ * WARNING: This will result in consuming the entire list so this should be used judiciously
+ * after considering the memory requirements of the returned list.
+ *
+ *
WARNING: The {@code RowBase} object passed to transformer function is not immutable and is
+ * not guaranteed to remain valid after the transformer function returns. The same {@code RowBase}
+ * object might be passed multiple times to the transformer with different underlying data each
+ * time. So *NEVER* keep a reference to the {@code RowBase} outside of the transformer.
+ * Specifically do not use {@link com.google.common.base.Functions#identity()} function.
+ *
+ * @param transformer function which will be used to transform the row. It should not return null.
+ */
+ ImmutableList toList(Function transformer) throws SpannerException;
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
new file mode 100644
index 0000000000..f277388b0b
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
@@ -0,0 +1,586 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiAsyncFunction;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.ListenableFutureToApiFuture;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.core.ExecutorProvider;
+import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.spanner.v1.ResultSetStats;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/** Default implementation for {@link AsyncResultSet}. */
+class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
+ private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());
+
+ /** State of an {@link AsyncResultSetImpl}. */
+ private enum State {
+ INITIALIZED,
+ /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
+ SYNC,
+ CONSUMING,
+ RUNNING,
+ PAUSED,
+ CANCELLED(true),
+ DONE(true);
+
+ /** Does this state mean that the result set should permanently stop producing rows. */
+ private final boolean shouldStop;
+
+ private State() {
+ shouldStop = false;
+ }
+
+ private State(boolean shouldStop) {
+ this.shouldStop = shouldStop;
+ }
+ }
+
+ static final int DEFAULT_BUFFER_SIZE = 10;
+ private static final int MAX_WAIT_FOR_BUFFER_CONSUMPTION = 10;
+ private static final SpannerException CANCELLED_EXCEPTION =
+ SpannerExceptionFactory.newSpannerException(
+ ErrorCode.CANCELLED, "This AsyncResultSet has been cancelled");
+
+ private final Object monitor = new Object();
+ private boolean closed;
+
+ /**
+ * {@link ExecutorProvider} provides executor services that are used to fetch data from the
+ * backend and put these into the buffer for further consumption by the callback.
+ */
+ private final ExecutorProvider executorProvider;
+
+ private final ListeningScheduledExecutorService service;
+
+ private final BlockingDeque buffer;
+ private Struct currentRow;
+ /** The underlying synchronous {@link ResultSet} that is producing the rows. */
+ private final ResultSet delegateResultSet;
+
+ /**
+ * Any exception that occurs while executing the query and iterating over the result set will be
+ * stored in this variable and propagated to the user through {@link #tryNext()}.
+ */
+ private volatile SpannerException executionException;
+
+ /**
+ * Executor for callbacks. Regardless of the type of executor that is provided, the {@link
+ * AsyncResultSetImpl} will ensure that at most 1 callback call will be active at any one time.
+ */
+ private Executor executor;
+
+ private ReadyCallback callback;
+
+ /**
+ * Listeners that will be called when the {@link AsyncResultSetImpl} has finished fetching all
+ * rows and any underlying transaction or session can be closed.
+ */
+ private Collection listeners = new LinkedList<>();
+
+ private State state = State.INITIALIZED;
+
+ /**
+ * {@link #finished} indicates whether all the results from the underlying result set have been
+ * read.
+ */
+ private volatile boolean finished;
+
+ private volatile ApiFuture result;
+
+ /**
+ * {@link #cursorReturnedDoneOrException} indicates whether {@link #tryNext()} has returned {@link
+ * CursorState#DONE} or a {@link SpannerException}.
+ */
+ private volatile boolean cursorReturnedDoneOrException;
+
+ /**
+ * {@link #pausedLatch} is used to pause the producer when the {@link AsyncResultSet} is paused.
+ * The production of rows that are put into the buffer is only paused once the buffer is full.
+ */
+ private volatile CountDownLatch pausedLatch = new CountDownLatch(1);
+ /**
+ * {@link #bufferConsumptionLatch} is used to pause the producer when the buffer is full and the
+ * consumer needs some time to catch up.
+ */
+ private volatile CountDownLatch bufferConsumptionLatch = new CountDownLatch(0);
+ /**
+ * {@link #consumingLatch} is used to pause the producer when all rows have been put into the
+ * buffer, but the consumer (the callback) has not yet received and processed all rows.
+ */
+ private volatile CountDownLatch consumingLatch = new CountDownLatch(0);
+
+ AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) {
+ super(delegate);
+ this.executorProvider = Preconditions.checkNotNull(executorProvider);
+ this.delegateResultSet = Preconditions.checkNotNull(delegate);
+ this.service = MoreExecutors.listeningDecorator(executorProvider.getExecutor());
+ this.buffer = new LinkedBlockingDeque<>(bufferSize);
+ }
+
+ /**
+ * Closes the {@link AsyncResultSet}. {@link #close()} is non-blocking and may be called multiple
+ * times without side effects. An {@link AsyncResultSet} may be closed before all rows have been
+ * returned to the callback, and calling {@link #tryNext()} on a closed {@link AsyncResultSet} is
+ * allowed as long as this is done from within a {@link ReadyCallback}. Calling {@link #resume()}
+ * on a closed {@link AsyncResultSet} is also allowed.
+ */
+ @Override
+ public void close() {
+ synchronized (monitor) {
+ if (this.closed) {
+ return;
+ }
+ if (state == State.INITIALIZED || state == State.SYNC) {
+ delegateResultSet.close();
+ }
+ this.closed = true;
+ }
+ }
+
+ /**
+ * Adds a listener that will be called when no more rows will be read from the underlying {@link
+ * ResultSet}, either because all rows have been read, or because {@link
+ * ReadyCallback#cursorReady(AsyncResultSet)} returned {@link CallbackResponse#DONE}.
+ */
+ @Override
+ public void addListener(Runnable listener) {
+ Preconditions.checkState(state == State.INITIALIZED);
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(Runnable listener) {
+ Preconditions.checkState(state == State.INITIALIZED);
+ listeners.remove(listener);
+ }
+
+ /**
+ * Tries to advance this {@link AsyncResultSet} to the next row. This method may only be called
+ * from within a {@link ReadyCallback}.
+ */
+ @Override
+ public CursorState tryNext() throws SpannerException {
+ synchronized (monitor) {
+ if (state == State.CANCELLED) {
+ cursorReturnedDoneOrException = true;
+ throw CANCELLED_EXCEPTION;
+ }
+ if (buffer.isEmpty() && executionException != null) {
+ cursorReturnedDoneOrException = true;
+ throw executionException;
+ }
+ Preconditions.checkState(
+ this.callback != null, "tryNext may only be called after a callback has been set.");
+ Preconditions.checkState(
+ this.state == State.CONSUMING,
+ "tryNext may only be called from a DataReady callback. Current state: "
+ + this.state.name());
+
+ if (finished && buffer.isEmpty()) {
+ cursorReturnedDoneOrException = true;
+ return CursorState.DONE;
+ }
+ }
+ if (!buffer.isEmpty()) {
+ // Set the next row from the buffer as the current row of the StructReader.
+ replaceDelegate(currentRow = buffer.pop());
+ synchronized (monitor) {
+ bufferConsumptionLatch.countDown();
+ }
+ return CursorState.OK;
+ }
+ return CursorState.NOT_READY;
+ }
+
+ private void closeDelegateResultSet() {
+ try {
+ delegateResultSet.close();
+ } catch (Throwable t) {
+ log.log(Level.FINE, "Ignoring error from closing delegate result set", t);
+ }
+ }
+
+ /**
+ * {@link CallbackRunnable} calls the {@link ReadyCallback} registered for this {@link
+ * AsyncResultSet}.
+ */
+ private class CallbackRunnable implements Runnable {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ synchronized (monitor) {
+ if (cursorReturnedDoneOrException) {
+ break;
+ }
+ }
+ CallbackResponse response;
+ try {
+ response = callback.cursorReady(AsyncResultSetImpl.this);
+ } catch (Throwable e) {
+ synchronized (monitor) {
+ if (cursorReturnedDoneOrException
+ && state == State.CANCELLED
+ && e instanceof SpannerException
+ && ((SpannerException) e).getErrorCode() == ErrorCode.CANCELLED) {
+ // The callback did not catch the cancelled exception (which it should have), but
+ // we'll keep the cancelled state.
+ return;
+ }
+ executionException = SpannerExceptionFactory.newSpannerException(e);
+ cursorReturnedDoneOrException = true;
+ }
+ return;
+ }
+ synchronized (monitor) {
+ if (state == State.CANCELLED) {
+ if (cursorReturnedDoneOrException) {
+ return;
+ }
+ } else {
+ switch (response) {
+ case DONE:
+ state = State.DONE;
+ closeDelegateResultSet();
+ return;
+ case PAUSE:
+ state = State.PAUSED;
+ // Make sure no-one else is waiting on the current pause latch and create a new
+ // one.
+ pausedLatch.countDown();
+ pausedLatch = new CountDownLatch(1);
+ return;
+ case CONTINUE:
+ if (buffer.isEmpty()) {
+ // Call the callback once more if the entire result set has been processed but
+ // the callback has not yet received a CursorState.DONE or a CANCELLED error.
+ if (finished && !cursorReturnedDoneOrException) {
+ break;
+ }
+ state = State.RUNNING;
+ return;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown response: " + response);
+ }
+ }
+ }
+ }
+ } finally {
+ synchronized (monitor) {
+ // Count down all latches that the producer might be waiting on.
+ consumingLatch.countDown();
+ while (bufferConsumptionLatch.getCount() > 0L) {
+ bufferConsumptionLatch.countDown();
+ }
+ }
+ }
+ }
+ }
+
+ private final CallbackRunnable callbackRunnable = new CallbackRunnable();
+
+ /**
+ * {@link ProduceRowsCallable} reads data from the underlying {@link ResultSet}, places these in
+ * the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
+ */
+ private class ProduceRowsCallable implements Callable {
+ @Override
+ public Void call() throws Exception {
+ boolean stop = false;
+ boolean hasNext = false;
+ try {
+ hasNext = delegateResultSet.next();
+ } catch (Throwable e) {
+ synchronized (monitor) {
+ executionException = SpannerExceptionFactory.newSpannerException(e);
+ }
+ }
+ try {
+ while (!stop && hasNext) {
+ try {
+ synchronized (monitor) {
+ stop = state.shouldStop;
+ }
+ if (!stop) {
+ while (buffer.remainingCapacity() == 0 && !stop) {
+ waitIfPaused();
+ // The buffer is full and we should let the callback consume a number of rows before
+ // we proceed with producing any more rows to prevent us from potentially waiting on
+ // a full buffer repeatedly.
+ // Wait until at least half of the buffer is available, or if it's a bigger buffer,
+ // wait until at least 10 rows can be placed in it.
+ // TODO: Make this more dynamic / configurable?
+ startCallbackWithBufferLatchIfNecessary(
+ Math.min(
+ Math.min(buffer.size() / 2 + 1, buffer.size()),
+ MAX_WAIT_FOR_BUFFER_CONSUMPTION));
+ bufferConsumptionLatch.await();
+ synchronized (monitor) {
+ stop = state.shouldStop;
+ }
+ }
+ }
+ if (!stop) {
+ buffer.put(delegateResultSet.getCurrentRowAsStruct());
+ startCallbackIfNecessary();
+ hasNext = delegateResultSet.next();
+ }
+ } catch (Throwable e) {
+ synchronized (monitor) {
+ executionException = SpannerExceptionFactory.newSpannerException(e);
+ stop = true;
+ }
+ }
+ }
+ // We don't need any more data from the underlying result set, so we close it as soon as
+ // possible. Any error that might occur during this will be ignored.
+ closeDelegateResultSet();
+
+ // Ensure that the callback has been called at least once, even if the result set was
+ // cancelled.
+ synchronized (monitor) {
+ finished = true;
+ stop = cursorReturnedDoneOrException;
+ }
+ // Call the callback if there are still rows in the buffer that need to be processed.
+ while (!stop) {
+ waitIfPaused();
+ startCallbackIfNecessary();
+ synchronized (monitor) {
+ stop = state.shouldStop || cursorReturnedDoneOrException;
+ }
+ // Make sure we wait until the callback runner has actually finished.
+ consumingLatch.await();
+ }
+ } finally {
+ if (executorProvider.shouldAutoClose()) {
+ service.shutdown();
+ }
+ for (Runnable listener : listeners) {
+ listener.run();
+ }
+ synchronized (monitor) {
+ if (executionException != null) {
+ throw executionException;
+ }
+ if (state == State.CANCELLED) {
+ throw CANCELLED_EXCEPTION;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void waitIfPaused() throws InterruptedException {
+ CountDownLatch pause;
+ synchronized (monitor) {
+ pause = pausedLatch;
+ }
+ pause.await();
+ }
+
+ private void startCallbackIfNecessary() {
+ startCallbackWithBufferLatchIfNecessary(0);
+ }
+
+ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
+ synchronized (monitor) {
+ if ((state == State.RUNNING || state == State.CANCELLED)
+ && !cursorReturnedDoneOrException) {
+ consumingLatch = new CountDownLatch(1);
+ if (bufferLatch > 0) {
+ bufferConsumptionLatch = new CountDownLatch(bufferLatch);
+ }
+ if (state == State.RUNNING) {
+ state = State.CONSUMING;
+ }
+ executor.execute(callbackRunnable);
+ }
+ }
+ }
+ }
+
+ /** Sets the callback for this {@link AsyncResultSet}. */
+ @Override
+ public ApiFuture setCallback(Executor exec, ReadyCallback cb) {
+ synchronized (monitor) {
+ Preconditions.checkState(!closed, "This AsyncResultSet has been closed");
+ Preconditions.checkState(
+ this.state == State.INITIALIZED, "callback may not be set multiple times");
+
+ // Start to fetch data and buffer these.
+ this.result =
+ new ListenableFutureToApiFuture<>(this.service.submit(new ProduceRowsCallable()));
+ this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec));
+ this.callback = Preconditions.checkNotNull(cb);
+ this.state = State.RUNNING;
+ pausedLatch.countDown();
+ return result;
+ }
+ }
+
+ Future getResult() {
+ return result;
+ }
+
+ @Override
+ public void cancel() {
+ synchronized (monitor) {
+ Preconditions.checkState(
+ state != State.INITIALIZED && state != State.SYNC,
+ "cannot cancel a result set without a callback");
+ state = State.CANCELLED;
+ pausedLatch.countDown();
+ }
+ }
+
+ @Override
+ public void resume() {
+ synchronized (monitor) {
+ Preconditions.checkState(
+ state != State.INITIALIZED && state != State.SYNC,
+ "cannot resume a result set without a callback");
+ if (state == State.PAUSED) {
+ state = State.RUNNING;
+ pausedLatch.countDown();
+ }
+ }
+ }
+
+ private static class CreateListCallback implements ReadyCallback {
+ private final SettableApiFuture> future;
+ private final Function transformer;
+ private final ImmutableList.Builder builder = ImmutableList.builder();
+
+ private CreateListCallback(
+ SettableApiFuture> future, Function transformer) {
+ this.future = future;
+ this.transformer = transformer;
+ }
+
+ @Override
+ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
+ try {
+ while (true) {
+ switch (resultSet.tryNext()) {
+ case DONE:
+ future.set(builder.build());
+ return CallbackResponse.DONE;
+ case NOT_READY:
+ return CallbackResponse.CONTINUE;
+ case OK:
+ builder.add(transformer.apply(resultSet));
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ future.setException(t);
+ return CallbackResponse.DONE;
+ }
+ }
+ }
+
+ @Override
+ public ApiFuture> toListAsync(
+ Function transformer, Executor executor) {
+ synchronized (monitor) {
+ Preconditions.checkState(!closed, "This AsyncResultSet has been closed");
+ Preconditions.checkState(
+ this.state == State.INITIALIZED, "This AsyncResultSet has already been used.");
+ final SettableApiFuture> res = SettableApiFuture.>create();
+ CreateListCallback callback = new CreateListCallback(res, transformer);
+ ApiFuture finished = setCallback(executor, callback);
+ return ApiFutures.transformAsync(
+ finished,
+ new ApiAsyncFunction>() {
+ @Override
+ public ApiFuture> apply(Void input) throws Exception {
+ return res;
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+ }
+
+ @Override
+ public ImmutableList toList(Function transformer)
+ throws SpannerException {
+ ApiFuture> future = toListAsync(transformer, MoreExecutors.directExecutor());
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause());
+ } catch (Throwable e) {
+ throw SpannerExceptionFactory.newSpannerException(e);
+ }
+ }
+
+ @Override
+ public boolean next() throws SpannerException {
+ synchronized (monitor) {
+ Preconditions.checkState(
+ this.state == State.INITIALIZED || this.state == State.SYNC,
+ "Cannot call next() on a result set with a callback.");
+ this.state = State.SYNC;
+ }
+ boolean res = delegateResultSet.next();
+ currentRow = delegateResultSet.getCurrentRowAsStruct();
+ return res;
+ }
+
+ @Override
+ public ResultSetStats getStats() {
+ return delegateResultSet.getStats();
+ }
+
+ @Override
+ protected void checkValidState() {
+ synchronized (monitor) {
+ Preconditions.checkState(
+ state == State.SYNC || state == State.CONSUMING || state == State.CANCELLED,
+ "only allowed after a next() call or from within a ReadyCallback#cursorReady callback");
+ Preconditions.checkState(state != State.SYNC || !closed, "ResultSet is closed");
+ }
+ }
+
+ @Override
+ public Struct getCurrentRowAsStruct() {
+ checkValidState();
+ return currentRow;
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java
new file mode 100644
index 0000000000..3cae49e65b
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.Timestamp;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+public interface AsyncRunner {
+
+ /**
+ * Functional interface for executing a read/write transaction asynchronously that returns a
+ * result of type R.
+ */
+ interface AsyncWork {
+ /**
+ * Performs a single transaction attempt. All reads/writes should be performed using {@code
+ * txn}.
+ *
+ * Implementations of this method should not attempt to commit the transaction directly:
+ * returning normally will result in the runner attempting to commit the transaction once the
+ * returned future completes, retrying on abort.
+ *
+ *
In most cases, the implementation will not need to catch {@code SpannerException}s from
+ * Spanner operations, instead letting these propagate to the framework. The transaction runner
+ * will take appropriate action based on the type of exception. In particular, implementations
+ * should never catch an exception of type {@link SpannerErrors#isAborted}: these indicate that
+ * some reads may have returned inconsistent data and the transaction attempt must be aborted.
+ *
+ * @param txn the transaction
+ * @return future over the result of the work
+ */
+ ApiFuture doWorkAsync(TransactionContext txn);
+ }
+
+ /** Executes a read/write transaction asynchronously using the given executor. */
+ ApiFuture runAsync(AsyncWork work, Executor executor);
+
+ /**
+ * Returns the timestamp at which the transaction committed. {@link ApiFuture#get()} will throw an
+ * {@link ExecutionException} if the transaction did not commit.
+ */
+ ApiFuture getCommitTimestamp();
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java
new file mode 100644
index 0000000000..5b83402919
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+class AsyncRunnerImpl implements AsyncRunner {
+ private final TransactionRunnerImpl delegate;
+ private final SettableApiFuture commitTimestamp = SettableApiFuture.create();
+
+ AsyncRunnerImpl(TransactionRunnerImpl delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ApiFuture runAsync(final AsyncWork work, Executor executor) {
+ final SettableApiFuture res = SettableApiFuture.create();
+ executor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ res.set(runTransaction(work));
+ } catch (Throwable t) {
+ res.setException(t);
+ } finally {
+ setCommitTimestamp();
+ }
+ }
+ });
+ return res;
+ }
+
+ private R runTransaction(final AsyncWork work) {
+ return delegate.run(
+ new TransactionCallable() {
+ @Override
+ public R run(TransactionContext transaction) throws Exception {
+ try {
+ return work.doWorkAsync(transaction).get();
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause());
+ } catch (InterruptedException e) {
+ throw SpannerExceptionFactory.propagateInterrupt(e);
+ }
+ }
+ });
+ }
+
+ private void setCommitTimestamp() {
+ try {
+ commitTimestamp.set(delegate.getCommitTimestamp());
+ } catch (Throwable t) {
+ commitTimestamp.setException(t);
+ }
+ }
+
+ @Override
+ public ApiFuture getCommitTimestamp() {
+ return commitTimestamp;
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java
new file mode 100644
index 0000000000..d519c68013
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
+import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
+import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
+import com.google.cloud.spanner.TransactionManager.TransactionState;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An interface for managing the life cycle of a read write transaction including all its retries.
+ * See {@link TransactionContext} for a description of transaction semantics.
+ *
+ * At any point in time there can be at most one active transaction in this manager. When that
+ * transaction is committed, if it fails with an {@code ABORTED} error, calling {@link
+ * #resetForRetryAsync()} would create a new {@link TransactionContextFuture}. The newly created
+ * transaction would use the same session thus increasing its lock priority. If the transaction is
+ * committed successfully, or is rolled back or commit fails with any error other than {@code
+ * ABORTED}, the manager is considered complete and no further transactions are allowed to be
+ * created in it.
+ *
+ *
Every {@code AsyncTransactionManager} should either be committed or rolled back. Failure to do
+ * so can cause resources to be leaked and deadlocks. Easiest way to guarantee this is by calling
+ * {@link #close()} in a finally block.
+ *
+ * @see DatabaseClient#transactionManagerAsync()
+ */
+public interface AsyncTransactionManager extends AutoCloseable {
+ /**
+ * {@link ApiFuture} that returns a {@link TransactionContext} and that supports chaining of
+ * multiple {@link TransactionContextFuture}s to form a transaction.
+ */
+ public interface TransactionContextFuture extends ApiFuture {
+ /**
+ * Sets the first step to execute as part of this transaction after the transaction has started
+ * using the specified executor. {@link MoreExecutors#directExecutor()} can be be used for
+ * lightweight functions, but should be avoided for heavy or blocking operations. See also
+ * {@link ListenableFuture#addListener(Runnable, Executor)} for further information.
+ */
+ AsyncTransactionStep then(
+ AsyncTransactionFunction function, Executor executor);
+ }
+
+ /**
+ * {@link ApiFuture} that returns the commit {@link Timestamp} of a Cloud Spanner transaction that
+ * is executed using an {@link AsyncTransactionManager}. This future is returned by the call to
+ * {@link AsyncTransactionStep#commitAsync()} of the last step in the transaction.
+ */
+ public interface CommitTimestampFuture extends ApiFuture {
+ /**
+ * Returns the commit timestamp of the transaction. Getting this value should always be done in
+ * order to ensure that the transaction succeeded. If any of the steps in the transaction fails
+ * with an uncaught exception, this method will automatically stop the transaction at that point
+ * and the exception will be returned as the cause of the {@link ExecutionException} that is
+ * thrown by this method.
+ *
+ * @throws AbortedException if the transaction was aborted by Cloud Spanner and needs to be
+ * retried.
+ */
+ @Override
+ Timestamp get() throws AbortedException, InterruptedException, ExecutionException;
+
+ /**
+ * Same as {@link #get()}, but will throw a {@link TimeoutException} if the transaction does not
+ * finish within the timeout.
+ */
+ @Override
+ Timestamp get(long timeout, TimeUnit unit)
+ throws AbortedException, InterruptedException, ExecutionException, TimeoutException;
+ }
+
+ /**
+ * {@link AsyncTransactionStep} is returned by {@link
+ * TransactionContextFuture#then(AsyncTransactionFunction)} and {@link
+ * AsyncTransactionStep#then(AsyncTransactionFunction)} and allows transaction steps that should
+ * be executed serially to be chained together. Each step can contain one or more statements that
+ * may execute in parallel.
+ *
+ * Example usage:
+ *
+ *
{@code
+ * TransactionContextFuture txnFuture = manager.beginAsync();
+ * final String column = "FirstName";
+ * txnFuture.then(
+ * new AsyncTransactionFunction() {
+ * @Override
+ * public ApiFuture apply(TransactionContext txn, Void input)
+ * throws Exception {
+ * return txn.readRowAsync(
+ * "Singers", Key.of(singerId), Collections.singleton(column));
+ * }
+ * })
+ * .then(
+ * new AsyncTransactionFunction() {
+ * @Override
+ * public ApiFuture apply(TransactionContext txn, Struct input)
+ * throws Exception {
+ * String name = input.getString(column);
+ * txn.buffer(
+ * Mutation.newUpdateBuilder("Singers")
+ * .set(column)
+ * .to(name.toUpperCase())
+ * .build());
+ * return ApiFutures.immediateFuture(null);
+ * }
+ * })
+ * }
+ */
+ public interface AsyncTransactionStep extends ApiFuture {
+ /**
+ * Adds a step to the transaction chain that should be executed using the specified executor.
+ * This step is guaranteed to be executed only after the previous step executed successfully.
+ * {@link MoreExecutors#directExecutor()} can be be used for lightweight functions, but should
+ * be avoided for heavy or blocking operations. See also {@link
+ * ListenableFuture#addListener(Runnable, Executor)} for further information.
+ */
+ AsyncTransactionStep then(
+ AsyncTransactionFunction next, Executor executor);
+
+ /**
+ * Commits the transaction and returns a {@link CommitTimestampFuture} that will return the
+ * commit timestamp of the transaction, or throw the first uncaught exception in the transaction
+ * chain as an {@link ExecutionException}.
+ */
+ CommitTimestampFuture commitAsync();
+ }
+
+ /**
+ * Each step in a transaction chain is defined by an {@link AsyncTransactionFunction}. It receives
+ * a {@link TransactionContext} and the output value of the previous transaction step as its input
+ * parameters. The method should return an {@link ApiFuture} that will return the result of this
+ * step.
+ */
+ public interface AsyncTransactionFunction {
+ /**
+ * {@link #apply(TransactionContext, Object)} is called when this transaction step is executed.
+ * The input value is the result of the previous step, and this method will only be called if
+ * the previous step executed successfully.
+ *
+ * @param txn the {@link TransactionContext} that can be used to execute statements.
+ * @param input the result of the previous transaction step.
+ * @return an {@link ApiFuture} that will return the result of this step, and that will be the
+ * input of the next transaction step. This method should never return null
.
+ * Instead, if the method does not have a return value, the method should return {@link
+ * ApiFutures#immediateFuture(null)}.
+ */
+ ApiFuture apply(TransactionContext txn, I input) throws Exception;
+ }
+
+ /**
+ * Creates a new read write transaction. This must be called before doing any other operation and
+ * can only be called once. To create a new transaction for subsequent retries, see {@link
+ * #resetForRetry()}.
+ */
+ TransactionContextFuture beginAsync();
+
+ /**
+ * Rolls back the currently active transaction. In most cases there should be no need to call this
+ * explicitly since {@link #close()} would automatically roll back any active transaction.
+ */
+ ApiFuture rollbackAsync();
+
+ /**
+ * Creates a new transaction for retry. This should only be called if the previous transaction
+ * failed with {@code ABORTED}. In all other cases, this will throw an {@link
+ * IllegalStateException}. Users should backoff before calling this method. Backoff delay is
+ * specified by {@link SpannerException#getRetryDelayInMillis()} on the {@code SpannerException}
+ * throw by the previous commit call.
+ */
+ TransactionContextFuture resetForRetryAsync();
+
+ /** Returns the state of the transaction. */
+ TransactionState getState();
+
+ /**
+ * Closes the manager. If there is an active transaction, it will be rolled back. Underlying
+ * session will be released back to the session pool.
+ */
+ @Override
+ void close();
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
new file mode 100644
index 0000000000..082fa827e7
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2017 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.SessionImpl.SessionTransaction;
+import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
+import com.google.cloud.spanner.TransactionManager.TransactionState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+
+/** Implementation of {@link AsyncTransactionManager}. */
+final class AsyncTransactionManagerImpl
+ implements CommittableAsyncTransactionManager, SessionTransaction {
+ private static final Tracer tracer = Tracing.getTracer();
+
+ private final SessionImpl session;
+ private Span span;
+
+ private TransactionRunnerImpl.TransactionContextImpl txn;
+ private TransactionState txnState;
+ private final SettableApiFuture commitTimestamp = SettableApiFuture.create();
+
+ AsyncTransactionManagerImpl(SessionImpl session, Span span) {
+ this.session = session;
+ this.span = span;
+ }
+
+ @Override
+ public void setSpan(Span span) {
+ this.span = span;
+ }
+
+ @Override
+ public void close() {
+ txn.close();
+ }
+
+ @Override
+ public TransactionContextFutureImpl beginAsync() {
+ Preconditions.checkState(txn == null, "begin can only be called once");
+ TransactionContextFutureImpl begin =
+ new TransactionContextFutureImpl(this, internalBeginAsync(true));
+ return begin;
+ }
+
+ private ApiFuture internalBeginAsync(boolean setActive) {
+ txnState = TransactionState.STARTED;
+ txn = session.newTransaction();
+ if (setActive) {
+ session.setActive(this);
+ }
+ final SettableApiFuture res = SettableApiFuture.create();
+ final ApiFuture fut = txn.ensureTxnAsync();
+ ApiFutures.addCallback(
+ fut,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ res.setException(SpannerExceptionFactory.newSpannerException(t));
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ res.set(txn);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return res;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (t instanceof AbortedException) {
+ txnState = TransactionState.ABORTED;
+ }
+ }
+
+ @Override
+ public ApiFuture commitAsync() {
+ Preconditions.checkState(
+ txnState == TransactionState.STARTED,
+ "commit can only be invoked if the transaction is in progress. Current state: " + txnState);
+ if (txn.isAborted()) {
+ txnState = TransactionState.ABORTED;
+ return ApiFutures.immediateFailedFuture(
+ SpannerExceptionFactory.newSpannerException(
+ ErrorCode.ABORTED, "Transaction already aborted"));
+ }
+ ApiFuture res = txn.commitAsync();
+ txnState = TransactionState.COMMITTED;
+ ApiFutures.addCallback(
+ res,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof AbortedException) {
+ txnState = TransactionState.ABORTED;
+ } else {
+ txnState = TransactionState.COMMIT_FAILED;
+ commitTimestamp.setException(t);
+ }
+ }
+
+ @Override
+ public void onSuccess(Timestamp result) {
+ commitTimestamp.set(result);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return res;
+ }
+
+ @Override
+ public ApiFuture rollbackAsync() {
+ Preconditions.checkState(
+ txnState == TransactionState.STARTED,
+ "rollback can only be called if the transaction is in progress");
+ try {
+ return txn.rollbackAsync();
+ } finally {
+ txnState = TransactionState.ROLLED_BACK;
+ }
+ }
+
+ @Override
+ public TransactionContextFuture resetForRetryAsync() {
+ if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) {
+ throw new IllegalStateException(
+ "resetForRetry can only be called if the previous attempt aborted");
+ }
+ return new TransactionContextFutureImpl(this, internalBeginAsync(false));
+ }
+
+ @Override
+ public TransactionState getState() {
+ return txnState;
+ }
+
+ @Override
+ public void invalidate() {
+ if (txnState == TransactionState.STARTED || txnState == null) {
+ txnState = TransactionState.ROLLED_BACK;
+ }
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
index 43de2be092..c84bef77cf 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
@@ -30,6 +30,7 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.TransactionSelector;
+import io.opencensus.trace.Tracing;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
.setTimestampBound(bound)
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
+ .setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
checkNotNull(bound));
}
@@ -67,6 +69,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
.setTimestamp(batchTransactionId.getTimestamp())
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
+ .setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
batchTransactionId);
}
@@ -81,6 +84,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(builder.setTimestampBound(bound));
this.sessionName = session.getName();
this.options = session.getOptions();
+ setSpan(Tracing.getTracer().getCurrentSpan());
initTransaction();
}
@@ -89,6 +93,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
this.sessionName = session.getName();
this.options = session.getOptions();
+ setSpan(Tracing.getTracer().getCurrentSpan());
}
@Override
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
index ac29ba2b37..d52d1d892e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
@@ -278,6 +278,127 @@ public interface DatabaseClient {
*/
TransactionManager transactionManager();
+ /**
+ * Returns an asynchronous transaction runner for executing a single logical transaction with
+ * retries. The returned runner can only be used once.
+ *
+ * Example of a read write transaction.
+ *
+ *
+ * Executor executor = Executors.newSingleThreadExecutor();
+ * final long singerId = my_singer_id;
+ * AsyncRunner runner = client.runAsync();
+ * ApiFuture rowCount =
+ * runner.runAsync(
+ * new AsyncWork() {
+ * @Override
+ * public ApiFuture doWorkAsync(TransactionContext txn) {
+ * String column = "FirstName";
+ * Struct row =
+ * txn.readRow("Singers", Key.of(singerId), Collections.singleton("Name"));
+ * String name = row.getString("Name");
+ * return txn.executeUpdateAsync(
+ * Statement.newBuilder("UPDATE Singers SET Name=@name WHERE SingerId=@id")
+ * .bind("id")
+ * .to(singerId)
+ * .bind("name")
+ * .to(name.toUpperCase())
+ * .build());
+ * }
+ * },
+ * executor);
+ *
+ */
+ AsyncRunner runAsync();
+
+ /**
+ * Returns an asynchronous transaction manager which allows manual management of transaction
+ * lifecycle. This API is meant for advanced users. Most users should instead use the {@link
+ * #runAsync()} API instead.
+ *
+ * Example of using {@link AsyncTransactionManager} with lambda expressions (Java 8 and
+ * higher).
+ *
+ *
{@code
+ * long singerId = 1L;
+ * try (AsyncTransactionManager manager = client.transactionManagerAsync()) {
+ * TransactionContextFuture txnFut = manager.beginAsync();
+ * while (true) {
+ * String column = "FirstName";
+ * CommitTimestampFuture commitTimestamp =
+ * txnFut
+ * .then(
+ * (txn, __) ->
+ * txn.readRowAsync(
+ * "Singers", Key.of(singerId), Collections.singleton(column)))
+ * .then(
+ * (txn, row) -> {
+ * String name = row.getString(column);
+ * txn.buffer(
+ * Mutation.newUpdateBuilder("Singers")
+ * .set(column)
+ * .to(name.toUpperCase())
+ * .build());
+ * return ApiFutures.immediateFuture(null);
+ * })
+ * .commitAsync();
+ * try {
+ * commitTimestamp.get();
+ * break;
+ * } catch (AbortedException e) {
+ * Thread.sleep(e.getRetryDelayInMillis() / 1000);
+ * txnFut = manager.resetForRetryAsync();
+ * }
+ * }
+ * }
+ * }
+ *
+ * Example of using {@link AsyncTransactionManager} (Java 7).
+ *
+ *
{@code
+ * final long singerId = 1L;
+ * try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
+ * TransactionContextFuture txn = manager.beginAsync();
+ * while (true) {
+ * final String column = "FirstName";
+ * CommitTimestampFuture commitTimestamp =
+ * txn.then(
+ * new AsyncTransactionFunction() {
+ * @Override
+ * public ApiFuture apply(TransactionContext txn, Void input)
+ * throws Exception {
+ * return txn.readRowAsync(
+ * "Singers", Key.of(singerId), Collections.singleton(column));
+ * }
+ * })
+ * .then(
+ * new AsyncTransactionFunction() {
+ * @Override
+ * public ApiFuture apply(TransactionContext txn, Struct input)
+ * throws Exception {
+ * String name = input.getString(column);
+ * txn.buffer(
+ * Mutation.newUpdateBuilder("Singers")
+ * .set(column)
+ * .to(name.toUpperCase())
+ * .build());
+ * return ApiFutures.immediateFuture(null);
+ * }
+ * })
+ * .commitAsync();
+ * try {
+ * commitTimestamp.get();
+ * break;
+ * } catch (AbortedException e) {
+ * Thread.sleep(e.getRetryDelayInMillis() / 1000);
+ * txn = manager.resetForRetryAsync();
+ * }
+ * }
+ * }
+ * }
+ */
+ AsyncTransactionManager transactionManagerAsync();
+
/**
* Returns the lower bound of rows modified by this DML statement.
*
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
index ec83d06335..4dd10001c7 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
@@ -17,7 +17,7 @@
package com.google.cloud.spanner;
import com.google.cloud.Timestamp;
-import com.google.cloud.spanner.SessionPool.PooledSession;
+import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -52,12 +52,12 @@ private enum SessionMode {
}
@VisibleForTesting
- PooledSession getReadSession() {
+ PooledSessionFuture getReadSession() {
return pool.getReadSession();
}
@VisibleForTesting
- PooledSession getReadWriteSession() {
+ PooledSessionFuture getReadWriteSession() {
return pool.getReadWriteSession();
}
@@ -191,6 +191,28 @@ public TransactionManager transactionManager() {
}
}
+ @Override
+ public AsyncRunner runAsync() {
+ Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
+ try (Scope s = tracer.withSpan(span)) {
+ return getReadWriteSession().runAsync();
+ } catch (RuntimeException e) {
+ TraceUtil.endSpanWithFailure(span, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public AsyncTransactionManager transactionManagerAsync() {
+ Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
+ try (Scope s = tracer.withSpan(span)) {
+ return getReadWriteSession().transactionManagerAsync();
+ } catch (RuntimeException e) {
+ TraceUtil.endSpanWithFailure(span, e);
+ throw e;
+ }
+ }
+
@Override
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
@@ -212,7 +234,7 @@ public Long apply(Session session) {
}
private T runWithSessionRetry(SessionMode mode, Function callable) {
- PooledSession session =
+ PooledSessionFuture session =
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
while (true) {
try {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseId.java
index d2c732750e..dd13df65e8 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseId.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseId.java
@@ -81,7 +81,7 @@ public String toString() {
* projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID}
* @throws IllegalArgumentException if {@code name} does not conform to the expected pattern
*/
- static DatabaseId of(String name) {
+ public static DatabaseId of(String name) {
Preconditions.checkNotNull(name);
Map parts = NAME_TEMPLATE.match(name);
Preconditions.checkArgument(
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java
new file mode 100644
index 0000000000..78e3505998
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiFuture;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.concurrent.Executor;
+
+/** Forwarding implementation of {@link AsyncResultSet} that forwards all calls to a delegate. */
+public class ForwardingAsyncResultSet extends ForwardingResultSet implements AsyncResultSet {
+ final AsyncResultSet delegate;
+
+ public ForwardingAsyncResultSet(AsyncResultSet delegate) {
+ super(Preconditions.checkNotNull(delegate));
+ this.delegate = delegate;
+ }
+
+ @Override
+ public CursorState tryNext() throws SpannerException {
+ return delegate.tryNext();
+ }
+
+ @Override
+ public ApiFuture setCallback(Executor exec, ReadyCallback cb) {
+ return delegate.setCallback(exec, cb);
+ }
+
+ @Override
+ public void cancel() {
+ delegate.cancel();
+ }
+
+ @Override
+ public void resume() {
+ delegate.resume();
+ }
+
+ @Override
+ public ApiFuture> toListAsync(
+ Function transformer, Executor executor) {
+ return delegate.toListAsync(transformer, executor);
+ }
+
+ @Override
+ public ImmutableList toList(Function transformer)
+ throws SpannerException {
+ return delegate.toList(transformer);
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
index 753c3f6f39..4cc0ab9b9e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
@@ -17,16 +17,23 @@
package com.google.cloud.spanner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.spanner.v1.ResultSetStats;
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ResultSet {
- private ResultSet delegate;
+ private Supplier delegate;
public ForwardingResultSet(ResultSet delegate) {
super(delegate);
- this.delegate = Preconditions.checkNotNull(delegate);
+ this.delegate = Suppliers.ofInstance(Preconditions.checkNotNull(delegate));
+ }
+
+ public ForwardingResultSet(Supplier supplier) {
+ super(supplier);
+ this.delegate = supplier;
}
/**
@@ -39,26 +46,26 @@ public ForwardingResultSet(ResultSet delegate) {
void replaceDelegate(ResultSet newDelegate) {
Preconditions.checkNotNull(newDelegate);
super.replaceDelegate(newDelegate);
- this.delegate = newDelegate;
+ this.delegate = Suppliers.ofInstance(Preconditions.checkNotNull(newDelegate));
}
@Override
public boolean next() throws SpannerException {
- return delegate.next();
+ return delegate.get().next();
}
@Override
public Struct getCurrentRowAsStruct() {
- return delegate.getCurrentRowAsStruct();
+ return delegate.get().getCurrentRowAsStruct();
}
@Override
public void close() {
- delegate.close();
+ delegate.get().close();
}
@Override
public ResultSetStats getStats() {
- return delegate.getStats();
+ return delegate.get().getStats();
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingStructReader.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingStructReader.java
index 9b30b89985..67e546ad5a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingStructReader.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingStructReader.java
@@ -20,14 +20,20 @@
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import java.util.List;
/** Forwarding implements of StructReader */
public class ForwardingStructReader implements StructReader {
- private StructReader delegate;
+ private Supplier extends StructReader> delegate;
public ForwardingStructReader(StructReader delegate) {
+ this.delegate = Suppliers.ofInstance(Preconditions.checkNotNull(delegate));
+ }
+
+ public ForwardingStructReader(Supplier extends StructReader> delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}
@@ -39,221 +45,271 @@ public ForwardingStructReader(StructReader delegate) {
* returned to the user.
*/
void replaceDelegate(StructReader newDelegate) {
- this.delegate = Preconditions.checkNotNull(newDelegate);
+ this.delegate = Suppliers.ofInstance(Preconditions.checkNotNull(newDelegate));
}
+ /**
+ * Called before each forwarding call to allow sub classes to do additional state checking. Sub
+ * classes should throw an {@link Exception} if the current state is not valid for reading data
+ * from this {@link ForwardingStructReader}. The default implementation does nothing.
+ */
+ protected void checkValidState() {}
+
@Override
public Type getType() {
- return delegate.getType();
+ checkValidState();
+ return delegate.get().getType();
}
@Override
public int getColumnCount() {
- return delegate.getColumnCount();
+ checkValidState();
+ return delegate.get().getColumnCount();
}
@Override
public int getColumnIndex(String columnName) {
- return delegate.getColumnIndex(columnName);
+ checkValidState();
+ return delegate.get().getColumnIndex(columnName);
}
@Override
public Type getColumnType(int columnIndex) {
- return delegate.getColumnType(columnIndex);
+ checkValidState();
+ return delegate.get().getColumnType(columnIndex);
}
@Override
public Type getColumnType(String columnName) {
- return delegate.getColumnType(columnName);
+ checkValidState();
+ return delegate.get().getColumnType(columnName);
}
@Override
public boolean isNull(int columnIndex) {
- return delegate.isNull(columnIndex);
+ checkValidState();
+ return delegate.get().isNull(columnIndex);
}
@Override
public boolean isNull(String columnName) {
- return delegate.isNull(columnName);
+ checkValidState();
+ return delegate.get().isNull(columnName);
}
@Override
public boolean getBoolean(int columnIndex) {
- return delegate.getBoolean(columnIndex);
+ checkValidState();
+ return delegate.get().getBoolean(columnIndex);
}
@Override
public boolean getBoolean(String columnName) {
- return delegate.getBoolean(columnName);
+ checkValidState();
+ return delegate.get().getBoolean(columnName);
}
@Override
public long getLong(int columnIndex) {
- return delegate.getLong(columnIndex);
+ checkValidState();
+ return delegate.get().getLong(columnIndex);
}
@Override
public long getLong(String columnName) {
- return delegate.getLong(columnName);
+ checkValidState();
+ return delegate.get().getLong(columnName);
}
@Override
public double getDouble(int columnIndex) {
- return delegate.getDouble(columnIndex);
+ checkValidState();
+ return delegate.get().getDouble(columnIndex);
}
@Override
public double getDouble(String columnName) {
- return delegate.getDouble(columnName);
+ checkValidState();
+ return delegate.get().getDouble(columnName);
}
@Override
public String getString(int columnIndex) {
- return delegate.getString(columnIndex);
+ checkValidState();
+ return delegate.get().getString(columnIndex);
}
@Override
public String getString(String columnName) {
- return delegate.getString(columnName);
+ checkValidState();
+ return delegate.get().getString(columnName);
}
@Override
public ByteArray getBytes(int columnIndex) {
- return delegate.getBytes(columnIndex);
+ checkValidState();
+ return delegate.get().getBytes(columnIndex);
}
@Override
public ByteArray getBytes(String columnName) {
- return delegate.getBytes(columnName);
+ checkValidState();
+ return delegate.get().getBytes(columnName);
}
@Override
public Timestamp getTimestamp(int columnIndex) {
- return delegate.getTimestamp(columnIndex);
+ checkValidState();
+ return delegate.get().getTimestamp(columnIndex);
}
@Override
public Timestamp getTimestamp(String columnName) {
- return delegate.getTimestamp(columnName);
+ checkValidState();
+ return delegate.get().getTimestamp(columnName);
}
@Override
public Date getDate(int columnIndex) {
- return delegate.getDate(columnIndex);
+ checkValidState();
+ return delegate.get().getDate(columnIndex);
}
@Override
public Date getDate(String columnName) {
- return delegate.getDate(columnName);
+ checkValidState();
+ return delegate.get().getDate(columnName);
}
@Override
public boolean[] getBooleanArray(int columnIndex) {
- return delegate.getBooleanArray(columnIndex);
+ checkValidState();
+ return delegate.get().getBooleanArray(columnIndex);
}
@Override
public boolean[] getBooleanArray(String columnName) {
- return delegate.getBooleanArray(columnName);
+ checkValidState();
+ return delegate.get().getBooleanArray(columnName);
}
@Override
public List getBooleanList(int columnIndex) {
- return delegate.getBooleanList(columnIndex);
+ checkValidState();
+ return delegate.get().getBooleanList(columnIndex);
}
@Override
public List getBooleanList(String columnName) {
- return delegate.getBooleanList(columnName);
+ checkValidState();
+ return delegate.get().getBooleanList(columnName);
}
@Override
public long[] getLongArray(int columnIndex) {
- return delegate.getLongArray(columnIndex);
+ checkValidState();
+ return delegate.get().getLongArray(columnIndex);
}
@Override
public long[] getLongArray(String columnName) {
- return delegate.getLongArray(columnName);
+ checkValidState();
+ return delegate.get().getLongArray(columnName);
}
@Override
public List getLongList(int columnIndex) {
- return delegate.getLongList(columnIndex);
+ checkValidState();
+ return delegate.get().getLongList(columnIndex);
}
@Override
public List getLongList(String columnName) {
- return delegate.getLongList(columnName);
+ checkValidState();
+ return delegate.get().getLongList(columnName);
}
@Override
public double[] getDoubleArray(int columnIndex) {
- return delegate.getDoubleArray(columnIndex);
+ checkValidState();
+ return delegate.get().getDoubleArray(columnIndex);
}
@Override
public double[] getDoubleArray(String columnName) {
- return delegate.getDoubleArray(columnName);
+ checkValidState();
+ return delegate.get().getDoubleArray(columnName);
}
@Override
public List getDoubleList(int columnIndex) {
- return delegate.getDoubleList(columnIndex);
+ checkValidState();
+ return delegate.get().getDoubleList(columnIndex);
}
@Override
public List getDoubleList(String columnName) {
- return delegate.getDoubleList(columnName);
+ checkValidState();
+ return delegate.get().getDoubleList(columnName);
}
@Override
public List getStringList(int columnIndex) {
- return delegate.getStringList(columnIndex);
+ checkValidState();
+ return delegate.get().getStringList(columnIndex);
}
@Override
public List getStringList(String columnName) {
- return delegate.getStringList(columnName);
+ checkValidState();
+ return delegate.get().getStringList(columnName);
}
@Override
public List getBytesList(int columnIndex) {
- return delegate.getBytesList(columnIndex);
+ checkValidState();
+ return delegate.get().getBytesList(columnIndex);
}
@Override
public List getBytesList(String columnName) {
- return delegate.getBytesList(columnName);
+ checkValidState();
+ return delegate.get().getBytesList(columnName);
}
@Override
public List getTimestampList(int columnIndex) {
- return delegate.getTimestampList(columnIndex);
+ checkValidState();
+ return delegate.get().getTimestampList(columnIndex);
}
@Override
public List getTimestampList(String columnName) {
- return delegate.getTimestampList(columnName);
+ checkValidState();
+ return delegate.get().getTimestampList(columnName);
}
@Override
public List getDateList(int columnIndex) {
- return delegate.getDateList(columnIndex);
+ checkValidState();
+ return delegate.get().getDateList(columnIndex);
}
@Override
public List getDateList(String columnName) {
- return delegate.getDateList(columnName);
+ checkValidState();
+ return delegate.get().getDateList(columnName);
}
@Override
public List getStructList(int columnIndex) {
- return delegate.getStructList(columnIndex);
+ checkValidState();
+ return delegate.get().getStructList(columnIndex);
}
@Override
public List getStructList(String columnName) {
- return delegate.getStructList(columnName);
+ checkValidState();
+ return delegate.get().getStructList(columnName);
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
index d193ad1c75..879b632d17 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
@@ -59,6 +59,11 @@ public static ReadAndQueryOption prefetchChunks(int prefetchChunks) {
return new FlowControlOption(prefetchChunks);
}
+ public static ReadAndQueryOption bufferRows(int bufferRows) {
+ Preconditions.checkArgument(bufferRows > 0, "bufferRows should be greater than 0");
+ return new BufferRowsOption(bufferRows);
+ }
+
/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
@@ -115,8 +120,22 @@ void appendToOptions(Options options) {
}
}
+ static final class BufferRowsOption extends InternalOption implements ReadAndQueryOption {
+ final int bufferRows;
+
+ BufferRowsOption(int bufferRows) {
+ this.bufferRows = bufferRows;
+ }
+
+ @Override
+ void appendToOptions(Options options) {
+ options.bufferRows = bufferRows;
+ }
+ }
+
private Long limit;
private Integer prefetchChunks;
+ private Integer bufferRows;
private Integer pageSize;
private String pageToken;
private String filter;
@@ -140,6 +159,14 @@ int prefetchChunks() {
return prefetchChunks;
}
+ boolean hasBufferRows() {
+ return bufferRows != null;
+ }
+
+ int bufferRows() {
+ return bufferRows;
+ }
+
boolean hasPageSize() {
return pageSize != null;
}
@@ -203,6 +230,10 @@ public boolean equals(Object o) {
|| hasPrefetchChunks()
&& that.hasPrefetchChunks()
&& Objects.equals(prefetchChunks(), that.prefetchChunks()))
+ && (!hasBufferRows() && !that.hasBufferRows()
+ || hasBufferRows()
+ && that.hasBufferRows()
+ && Objects.equals(bufferRows(), that.bufferRows()))
&& (!hasPageSize() && !that.hasPageSize()
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
@@ -218,6 +249,9 @@ public int hashCode() {
if (prefetchChunks != null) {
result = 31 * result + prefetchChunks.hashCode();
}
+ if (bufferRows != null) {
+ result = 31 * result + bufferRows.hashCode();
+ }
if (pageSize != null) {
result = 31 * result + pageSize.hashCode();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java
index 638c567a03..96ae390dd6 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java
@@ -33,6 +33,7 @@
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status.Code;
+import io.opencensus.trace.Span;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -160,4 +161,8 @@ long executeStreamingPartitionedUpdate(final Statement statement, Duration timeo
public void invalidate() {
isValid = false;
}
+
+ // No-op method needed to implement SessionTransaction interface.
+ @Override
+ public void setSpan(Span span) {}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ReadContext.java
index 16f40769fa..e87d40fb20 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ReadContext.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import com.google.api.core.ApiFuture;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import javax.annotation.Nullable;
@@ -65,6 +66,13 @@ enum QueryAnalyzeMode {
*/
ResultSet read(String table, KeySet keys, Iterable columns, ReadOption... options);
+ /**
+ * Same as {@link #read(String, KeySet, Iterable, ReadOption...)}, but is guaranteed to be
+ * non-blocking and will return the results as an {@link AsyncResultSet}.
+ */
+ AsyncResultSet readAsync(
+ String table, KeySet keys, Iterable columns, ReadOption... options);
+
/**
* Reads zero or more rows from a database using an index.
*
@@ -93,6 +101,13 @@ enum QueryAnalyzeMode {
ResultSet readUsingIndex(
String table, String index, KeySet keys, Iterable columns, ReadOption... options);
+ /**
+ * Same as {@link #readUsingIndex(String, String, KeySet, Iterable, ReadOption...)}, but is
+ * guaranteed to be non-blocking and will return its results as an {@link AsyncResultSet}.
+ */
+ AsyncResultSet readUsingIndexAsync(
+ String table, String index, KeySet keys, Iterable columns, ReadOption... options);
+
/**
* Reads a single row from a database, returning {@code null} if the row does not exist.
*
@@ -112,6 +127,9 @@ ResultSet readUsingIndex(
@Nullable
Struct readRow(String table, Key key, Iterable columns);
+ /** Same as {@link #readRow(String, Key, Iterable)}, but is guaranteed to be non-blocking. */
+ ApiFuture readRowAsync(String table, Key key, Iterable columns);
+
/**
* Reads a single row from a database using an index, returning {@code null} if the row does not
* exist.
@@ -134,6 +152,13 @@ ResultSet readUsingIndex(
@Nullable
Struct readRowUsingIndex(String table, String index, Key key, Iterable columns);
+ /**
+ * Same as {@link #readRowUsingIndex(String, String, Key, Iterable)}, but is guaranteed to be
+ * non-blocking.
+ */
+ ApiFuture readRowUsingIndexAsync(
+ String table, String index, Key key, Iterable columns);
+
/**
* Executes a query against the database.
*
@@ -160,6 +185,12 @@ ResultSet readUsingIndex(
*/
ResultSet executeQuery(Statement statement, QueryOption... options);
+ /**
+ * Same as {@link #executeQuery(Statement, QueryOption...)}, but is guaranteed to be non-blocking
+ * and returns its results as an {@link AsyncResultSet}.
+ */
+ AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options);
+
/**
* Analyzes a query and returns query plan and/or query execution statistics information.
*
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
index 29c3e52c6a..278b15d967 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
@@ -16,6 +16,8 @@
package com.google.cloud.spanner;
+import com.google.api.gax.core.ExecutorProvider;
+import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
@@ -23,6 +25,7 @@
import com.google.cloud.spanner.Type.StructField;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.ResultSetStats;
import java.util.List;
@@ -41,6 +44,30 @@ public static ResultSet forRows(Type type, Iterable rows) {
return new PrePopulatedResultSet(type, rows);
}
+ /** Converts the given {@link ResultSet} to an {@link AsyncResultSet}. */
+ public static AsyncResultSet toAsyncResultSet(ResultSet delegate) {
+ return new AsyncResultSetImpl(
+ InstantiatingExecutorProvider.newBuilder()
+ .setExecutorThreadCount(1)
+ .setThreadFactory(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("test-async-resultset-%d")
+ .build())
+ .build(),
+ delegate,
+ 100);
+ }
+
+ /**
+ * Converts the given {@link ResultSet} to an {@link AsyncResultSet} using the given {@link
+ * ExecutorProvider}.
+ */
+ public static AsyncResultSet toAsyncResultSet(
+ ResultSet delegate, ExecutorProvider executorProvider) {
+ return new AsyncResultSetImpl(executorProvider, delegate, 100);
+ }
+
private static class PrePopulatedResultSet implements ResultSet {
private final List rows;
private final Type type;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index b865efa2d9..ce4d27e94e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
@@ -28,6 +29,7 @@
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
@@ -43,6 +45,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
/**
@@ -76,14 +79,17 @@ static void throwIfTransactionsPending() {
static interface SessionTransaction {
/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();
+ /** Registers the current span on the transaction. */
+ void setSpan(Span span);
}
private final SpannerImpl spanner;
private final String name;
private final DatabaseId databaseId;
private SessionTransaction activeTransaction;
- private ByteString readyTransactionId;
+ ByteString readyTransactionId;
private final Map options;
+ private Span currentSpan;
SessionImpl(SpannerImpl spanner, String name, Map options) {
this.spanner = spanner;
@@ -101,6 +107,10 @@ public String getName() {
return options;
}
+ void setCurrentSpan(Span span) {
+ currentSpan = span;
+ }
+
@Override
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
@@ -170,6 +180,8 @@ public ReadContext singleUse(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setSpan(currentSpan)
+ .setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
}
@@ -187,6 +199,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setSpan(currentSpan)
+ .setExecutorProvider(spanner.getAsyncExecutorProvider())
.buildSingleUseReadOnlyTransaction());
}
@@ -204,6 +218,8 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setSpan(currentSpan)
+ .setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
}
@@ -213,6 +229,23 @@ public TransactionRunner readWriteTransaction() {
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
}
+ @Override
+ public AsyncRunner runAsync() {
+ return new AsyncRunnerImpl(
+ setActive(
+ new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())));
+ }
+
+ @Override
+ public TransactionManager transactionManager() {
+ return new TransactionManagerImpl(this, currentSpan);
+ }
+
+ @Override
+ public AsyncTransactionManagerImpl transactionManagerAsync() {
+ return new AsyncTransactionManagerImpl(this, currentSpan);
+ }
+
@Override
public void prepareReadWriteTransaction() {
setActive(null);
@@ -238,27 +271,59 @@ public void close() {
}
ByteString beginTransaction() {
- Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
- final BeginTransactionRequest request =
- BeginTransactionRequest.newBuilder()
- .setSession(name)
- .setOptions(
- TransactionOptions.newBuilder()
- .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
- .build();
- Transaction txn = spanner.getRpc().beginTransaction(request, options);
- if (txn.getId().isEmpty()) {
- throw newSpannerException(ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
- }
- span.end(TraceUtil.END_SPAN_OPTIONS);
- return txn.getId();
- } catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
- throw e;
+ try {
+ return beginTransactionAsync().get();
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
+ } catch (InterruptedException e) {
+ throw SpannerExceptionFactory.propagateInterrupt(e);
}
}
+ ApiFuture beginTransactionAsync() {
+ final SettableApiFuture res = SettableApiFuture.create();
+ final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
+ final BeginTransactionRequest request =
+ BeginTransactionRequest.newBuilder()
+ .setSession(name)
+ .setOptions(
+ TransactionOptions.newBuilder()
+ .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
+ .build();
+ final ApiFuture requestFuture =
+ spanner.getRpc().beginTransactionAsync(request, options);
+ requestFuture.addListener(
+ tracer.withSpan(
+ span,
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Transaction txn = requestFuture.get();
+ if (txn.getId().isEmpty()) {
+ throw newSpannerException(
+ ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
+ }
+ span.end(TraceUtil.END_SPAN_OPTIONS);
+ res.set(txn.getId());
+ } catch (ExecutionException e) {
+ TraceUtil.endSpanWithFailure(span, e);
+ res.setException(
+ SpannerExceptionFactory.newSpannerException(
+ e.getCause() == null ? e : e.getCause()));
+ } catch (InterruptedException e) {
+ TraceUtil.endSpanWithFailure(span, e);
+ res.setException(SpannerExceptionFactory.propagateInterrupt(e));
+ } catch (Exception e) {
+ TraceUtil.endSpanWithFailure(span, e);
+ res.setException(e);
+ }
+ }
+ }),
+ MoreExecutors.directExecutor());
+ return res;
+ }
+
TransactionContextImpl newTransaction() {
return TransactionContextImpl.newBuilder()
.setSession(this)
@@ -266,6 +331,8 @@ TransactionContextImpl newTransaction() {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setSpan(currentSpan)
+ .setExecutorProvider(spanner.getAsyncExecutorProvider())
.build();
}
@@ -277,11 +344,13 @@ T setActive(@Nullable T ctx) {
}
activeTransaction = ctx;
readyTransactionId = null;
+ if (activeTransaction != null) {
+ activeTransaction.setSpan(currentSpan);
+ }
return ctx;
}
- @Override
- public TransactionManager transactionManager() {
- return new TransactionManagerImpl(this);
+ boolean hasReadyTransaction() {
+ return readyTransactionId != null;
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
index 2286c3b34b..90e399fad6 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
@@ -40,6 +40,8 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
@@ -48,6 +50,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
+import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
@@ -56,11 +59,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ForwardingListenableFuture;
+import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.common.ToLongFunction;
@@ -85,12 +89,17 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -132,53 +141,115 @@ Instant instant() {
}
}
+ private abstract static class CachedResultSetSupplier implements Supplier {
+ private ResultSet cached;
+
+ abstract ResultSet load();
+
+ ResultSet reload() {
+ return cached = load();
+ }
+
+ @Override
+ public ResultSet get() {
+ if (cached == null) {
+ cached = load();
+ }
+ return cached;
+ }
+ }
+
/**
* Wrapper around {@code ReadContext} that releases the session to the pool once the call is
* finished, if it is a single use context.
*/
private static class AutoClosingReadContext implements ReadContext {
- private final Function readContextDelegateSupplier;
+ /**
+ * {@link AsyncResultSet} implementation that keeps track of the async operations that are still
+ * running for this {@link ReadContext} and that should finish before the {@link ReadContext}
+ * releases its session back into the pool.
+ */
+ private class AutoClosingReadContextAsyncResultSetImpl extends AsyncResultSetImpl {
+ private AutoClosingReadContextAsyncResultSetImpl(
+ ExecutorProvider executorProvider, ResultSet delegate, int bufferRows) {
+ super(executorProvider, delegate, bufferRows);
+ }
+
+ @Override
+ public ApiFuture setCallback(Executor exec, ReadyCallback cb) {
+ Runnable listener =
+ new Runnable() {
+ @Override
+ public void run() {
+ synchronized (lock) {
+ if (asyncOperationsCount.decrementAndGet() == 0 && closed) {
+ // All async operations for this read context have finished.
+ AutoClosingReadContext.this.close();
+ }
+ }
+ }
+ };
+ try {
+ asyncOperationsCount.incrementAndGet();
+ addListener(listener);
+ return super.setCallback(exec, cb);
+ } catch (Throwable t) {
+ removeListener(listener);
+ asyncOperationsCount.decrementAndGet();
+ throw t;
+ }
+ }
+ }
+
+ private final Function readContextDelegateSupplier;
private T readContextDelegate;
private final SessionPool sessionPool;
- private PooledSession session;
private final boolean isSingleUse;
- private boolean closed;
+ private final AtomicInteger asyncOperationsCount = new AtomicInteger();
+
+ private Object lock = new Object();
+
+ @GuardedBy("lock")
private boolean sessionUsedForQuery = false;
+ @GuardedBy("lock")
+ private PooledSessionFuture session;
+
+ @GuardedBy("lock")
+ private boolean closed;
+
+ @GuardedBy("lock")
+ private boolean delegateClosed;
+
private AutoClosingReadContext(
- Function delegateSupplier,
+ Function delegateSupplier,
SessionPool sessionPool,
- PooledSession session,
+ PooledSessionFuture session,
boolean isSingleUse) {
this.readContextDelegateSupplier = delegateSupplier;
this.sessionPool = sessionPool;
this.session = session;
this.isSingleUse = isSingleUse;
- while (true) {
- try {
- this.readContextDelegate = readContextDelegateSupplier.apply(this.session);
- break;
- } catch (SessionNotFoundException e) {
- replaceSessionIfPossible(e);
- }
- }
}
T getReadContextDelegate() {
+ synchronized (lock) {
+ if (readContextDelegate == null) {
+ while (true) {
+ try {
+ this.readContextDelegate = readContextDelegateSupplier.apply(this.session);
+ break;
+ } catch (SessionNotFoundException e) {
+ replaceSessionIfPossible(e);
+ }
+ }
+ }
+ }
return readContextDelegate;
}
- private ResultSet wrap(final Supplier resultSetSupplier) {
- ResultSet res;
- while (true) {
- try {
- res = resultSetSupplier.get();
- break;
- } catch (SessionNotFoundException e) {
- replaceSessionIfPossible(e);
- }
- }
- return new ForwardingResultSet(res) {
+ private ResultSet wrap(final CachedResultSetSupplier resultSetSupplier) {
+ return new ForwardingResultSet(resultSetSupplier) {
private boolean beforeFirst = true;
@Override
@@ -187,8 +258,18 @@ public boolean next() throws SpannerException {
try {
return internalNext();
} catch (SessionNotFoundException e) {
- replaceSessionIfPossible(e);
- replaceDelegate(resultSetSupplier.get());
+ while (true) {
+ // Keep the replace-if-possible outside the try-block to let the exception bubble up
+ // if it's too late to replace the session.
+ replaceSessionIfPossible(e);
+ try {
+ replaceDelegate(resultSetSupplier.reload());
+ break;
+ } catch (SessionNotFoundException snfe) {
+ e = snfe;
+ // retry on yet another session.
+ }
+ }
}
}
}
@@ -197,9 +278,11 @@ private boolean internalNext() {
try {
boolean ret = super.next();
if (beforeFirst) {
- session.markUsed();
- beforeFirst = false;
- sessionUsedForQuery = true;
+ synchronized (lock) {
+ session.get().markUsed();
+ beforeFirst = false;
+ sessionUsedForQuery = true;
+ }
}
if (!ret && isSingleUse) {
close();
@@ -208,9 +291,11 @@ private boolean internalNext() {
} catch (SessionNotFoundException e) {
throw e;
} catch (SpannerException e) {
- if (!closed && isSingleUse) {
- session.lastException = e;
- AutoClosingReadContext.this.close();
+ synchronized (lock) {
+ if (!closed && isSingleUse) {
+ session.get().lastException = e;
+ AutoClosingReadContext.this.close();
+ }
}
throw e;
}
@@ -218,22 +303,27 @@ private boolean internalNext() {
@Override
public void close() {
- super.close();
- if (isSingleUse) {
- AutoClosingReadContext.this.close();
+ try {
+ super.close();
+ } finally {
+ if (isSingleUse) {
+ AutoClosingReadContext.this.close();
+ }
}
}
};
}
- private void replaceSessionIfPossible(SessionNotFoundException e) {
- if (isSingleUse || !sessionUsedForQuery) {
- // This class is only used by read-only transactions, so we know that we only need a
- // read-only session.
- session = sessionPool.replaceReadSession(e, session);
- readContextDelegate = readContextDelegateSupplier.apply(session);
- } else {
- throw e;
+ private void replaceSessionIfPossible(SessionNotFoundException notFound) {
+ synchronized (lock) {
+ if (isSingleUse || !sessionUsedForQuery) {
+ // This class is only used by read-only transactions, so we know that we only need a
+ // read-only session.
+ session = sessionPool.replaceReadSession(notFound, session);
+ readContextDelegate = readContextDelegateSupplier.apply(session);
+ } else {
+ throw notFound;
+ }
}
}
@@ -244,14 +334,37 @@ public ResultSet read(
final Iterable columns,
final ReadOption... options) {
return wrap(
- new Supplier() {
+ new CachedResultSetSupplier() {
@Override
- public ResultSet get() {
- return readContextDelegate.read(table, keys, columns, options);
+ ResultSet load() {
+ return getReadContextDelegate().read(table, keys, columns, options);
}
});
}
+ @Override
+ public AsyncResultSet readAsync(
+ final String table,
+ final KeySet keys,
+ final Iterable columns,
+ final ReadOption... options) {
+ Options readOptions = Options.fromReadOptions(options);
+ final int bufferRows =
+ readOptions.hasBufferRows()
+ ? readOptions.bufferRows()
+ : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
+ return new AutoClosingReadContextAsyncResultSetImpl(
+ sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
+ wrap(
+ new CachedResultSetSupplier() {
+ @Override
+ ResultSet load() {
+ return getReadContextDelegate().read(table, keys, columns, options);
+ }
+ }),
+ bufferRows);
+ }
+
@Override
public ResultSet readUsingIndex(
final String table,
@@ -260,84 +373,159 @@ public ResultSet readUsingIndex(
final Iterable columns,
final ReadOption... options) {
return wrap(
- new Supplier() {
+ new CachedResultSetSupplier() {
@Override
- public ResultSet get() {
- return readContextDelegate.readUsingIndex(table, index, keys, columns, options);
+ ResultSet load() {
+ return getReadContextDelegate().readUsingIndex(table, index, keys, columns, options);
}
});
}
+ @Override
+ public AsyncResultSet readUsingIndexAsync(
+ final String table,
+ final String index,
+ final KeySet keys,
+ final Iterable columns,
+ final ReadOption... options) {
+ Options readOptions = Options.fromReadOptions(options);
+ final int bufferRows =
+ readOptions.hasBufferRows()
+ ? readOptions.bufferRows()
+ : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
+ return new AutoClosingReadContextAsyncResultSetImpl(
+ sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
+ wrap(
+ new CachedResultSetSupplier() {
+ @Override
+ ResultSet load() {
+ return getReadContextDelegate()
+ .readUsingIndex(table, index, keys, columns, options);
+ }
+ }),
+ bufferRows);
+ }
+
@Override
@Nullable
public Struct readRow(String table, Key key, Iterable columns) {
try {
while (true) {
try {
- session.markUsed();
- return readContextDelegate.readRow(table, key, columns);
+ synchronized (lock) {
+ session.get().markUsed();
+ }
+ return getReadContextDelegate().readRow(table, key, columns);
} catch (SessionNotFoundException e) {
replaceSessionIfPossible(e);
}
}
} finally {
- sessionUsedForQuery = true;
+ synchronized (lock) {
+ sessionUsedForQuery = true;
+ }
if (isSingleUse) {
close();
}
}
}
+ @Override
+ public ApiFuture readRowAsync(String table, Key key, Iterable columns) {
+ try (AsyncResultSet rs = readAsync(table, KeySet.singleKey(key), columns)) {
+ return AbstractReadContext.consumeSingleRowAsync(rs);
+ }
+ }
+
@Override
@Nullable
public Struct readRowUsingIndex(String table, String index, Key key, Iterable columns) {
try {
while (true) {
try {
- session.markUsed();
- return readContextDelegate.readRowUsingIndex(table, index, key, columns);
+ synchronized (lock) {
+ session.get().markUsed();
+ }
+ return getReadContextDelegate().readRowUsingIndex(table, index, key, columns);
} catch (SessionNotFoundException e) {
replaceSessionIfPossible(e);
}
}
} finally {
- sessionUsedForQuery = true;
+ synchronized (lock) {
+ sessionUsedForQuery = true;
+ }
if (isSingleUse) {
close();
}
}
}
+ @Override
+ public ApiFuture readRowUsingIndexAsync(
+ String table, String index, Key key, Iterable columns) {
+ try (AsyncResultSet rs = readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
+ return AbstractReadContext.consumeSingleRowAsync(rs);
+ }
+ }
+
@Override
public ResultSet executeQuery(final Statement statement, final QueryOption... options) {
return wrap(
- new Supplier() {
+ new CachedResultSetSupplier() {
@Override
- public ResultSet get() {
- return readContextDelegate.executeQuery(statement, options);
+ ResultSet load() {
+ return getReadContextDelegate().executeQuery(statement, options);
}
});
}
+ @Override
+ public AsyncResultSet executeQueryAsync(
+ final Statement statement, final QueryOption... options) {
+ Options queryOptions = Options.fromQueryOptions(options);
+ final int bufferRows =
+ queryOptions.hasBufferRows()
+ ? queryOptions.bufferRows()
+ : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
+ return new AutoClosingReadContextAsyncResultSetImpl(
+ sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
+ wrap(
+ new CachedResultSetSupplier() {
+ @Override
+ ResultSet load() {
+ return getReadContextDelegate().executeQuery(statement, options);
+ }
+ }),
+ bufferRows);
+ }
+
@Override
public ResultSet analyzeQuery(final Statement statement, final QueryAnalyzeMode queryMode) {
return wrap(
- new Supplier() {
+ new CachedResultSetSupplier() {
@Override
- public ResultSet get() {
- return readContextDelegate.analyzeQuery(statement, queryMode);
+ ResultSet load() {
+ return getReadContextDelegate().analyzeQuery(statement, queryMode);
}
});
}
@Override
public void close() {
- if (closed) {
- return;
+ synchronized (lock) {
+ if (closed && delegateClosed) {
+ return;
+ }
+ closed = true;
+ if (asyncOperationsCount.get() == 0) {
+ if (readContextDelegate != null) {
+ readContextDelegate.close();
+ }
+ session.close();
+ delegateClosed = true;
+ }
}
- closed = true;
- readContextDelegate.close();
- session.close();
}
}
@@ -345,9 +533,9 @@ private static class AutoClosingReadTransaction
extends AutoClosingReadContext implements ReadOnlyTransaction {
AutoClosingReadTransaction(
- Function txnSupplier,
+ Function txnSupplier,
SessionPool sessionPool,
- PooledSession session,
+ PooledSessionFuture session,
boolean isSingleUse) {
super(txnSupplier, sessionPool, session, isSingleUse);
}
@@ -394,6 +582,13 @@ public ResultSet read(
return new SessionPoolResultSet(delegate.read(table, keys, columns, options));
}
+ @Override
+ public AsyncResultSet readAsync(
+ String table, KeySet keys, Iterable columns, ReadOption... options) {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.UNIMPLEMENTED, "not yet implemented");
+ }
+
@Override
public ResultSet readUsingIndex(
String table,
@@ -405,6 +600,17 @@ public ResultSet readUsingIndex(
delegate.readUsingIndex(table, index, keys, columns, options));
}
+ @Override
+ public AsyncResultSet readUsingIndexAsync(
+ String table,
+ String index,
+ KeySet keys,
+ Iterable columns,
+ ReadOption... options) {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.UNIMPLEMENTED, "not yet implemented");
+ }
+
@Override
public Struct readRow(String table, Key key, Iterable columns) {
try {
@@ -414,6 +620,13 @@ public Struct readRow(String table, Key key, Iterable columns) {
}
}
+ @Override
+ public ApiFuture readRowAsync(String table, Key key, Iterable columns) {
+ try (AsyncResultSet rs = readAsync(table, KeySet.singleKey(key), columns)) {
+ return AbstractReadContext.consumeSingleRowAsync(rs);
+ }
+ }
+
@Override
public void buffer(Mutation mutation) {
delegate.buffer(mutation);
@@ -429,6 +642,15 @@ public Struct readRowUsingIndex(
}
}
+ @Override
+ public ApiFuture readRowUsingIndexAsync(
+ String table, String index, Key key, Iterable columns) {
+ try (AsyncResultSet rs =
+ readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
+ return AbstractReadContext.consumeSingleRowAsync(rs);
+ }
+ }
+
@Override
public void buffer(Iterable mutations) {
delegate.buffer(mutations);
@@ -443,6 +665,15 @@ public long executeUpdate(Statement statement) {
}
}
+ @Override
+ public ApiFuture executeUpdateAsync(Statement statement) {
+ try {
+ return delegate.executeUpdateAsync(statement);
+ } catch (SessionNotFoundException e) {
+ throw handleSessionNotFound(e);
+ }
+ }
+
@Override
public long[] batchUpdate(Iterable statements) {
try {
@@ -452,11 +683,29 @@ public long[] batchUpdate(Iterable statements) {
}
}
+ @Override
+ public ApiFuture batchUpdateAsync(Iterable statements) {
+ try {
+ return delegate.batchUpdateAsync(statements);
+ } catch (SessionNotFoundException e) {
+ throw handleSessionNotFound(e);
+ }
+ }
+
@Override
public ResultSet executeQuery(Statement statement, QueryOption... options) {
return new SessionPoolResultSet(delegate.executeQuery(statement, options));
}
+ @Override
+ public AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
+ try {
+ return delegate.executeQueryAsync(statement, options);
+ } catch (SessionNotFoundException e) {
+ throw handleSessionNotFound(e);
+ }
+ }
+
@Override
public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) {
return new SessionPoolResultSet(delegate.analyzeQuery(statement, queryMode));
@@ -470,39 +719,40 @@ public void close() {
private TransactionManager delegate;
private final SessionPool sessionPool;
- private PooledSession session;
+ private PooledSessionFuture session;
private boolean closed;
private boolean restartedAfterSessionNotFound;
- AutoClosingTransactionManager(SessionPool sessionPool, PooledSession session) {
+ AutoClosingTransactionManager(SessionPool sessionPool, PooledSessionFuture session) {
this.sessionPool = sessionPool;
this.session = session;
- this.delegate = session.delegate.transactionManager();
}
@Override
public TransactionContext begin() {
+ this.delegate = session.get().transactionManager();
while (true) {
try {
return internalBegin();
} catch (SessionNotFoundException e) {
session = sessionPool.replaceReadWriteSession(e, session);
- delegate = session.delegate.transactionManager();
+ delegate = session.get().delegate.transactionManager();
}
}
}
private TransactionContext internalBegin() {
TransactionContext res = new SessionPoolTransactionContext(delegate.begin());
- session.markUsed();
+ session.get().markUsed();
return res;
}
- private SpannerException handleSessionNotFound(SessionNotFoundException e) {
- session = sessionPool.replaceReadWriteSession(e, session);
- delegate = session.delegate.transactionManager();
+ private SpannerException handleSessionNotFound(SessionNotFoundException notFound) {
+ session = sessionPool.replaceReadWriteSession(notFound, session);
+ delegate = session.get().delegate.transactionManager();
restartedAfterSessionNotFound = true;
- return SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, e.getMessage(), e);
+ return SpannerExceptionFactory.newSpannerException(
+ ErrorCode.ABORTED, notFound.getMessage(), notFound);
}
@Override
@@ -540,7 +790,7 @@ public TransactionContext resetForRetry() {
}
} catch (SessionNotFoundException e) {
session = sessionPool.replaceReadWriteSession(e, session);
- delegate = session.delegate.transactionManager();
+ delegate = session.get().delegate.transactionManager();
restartedAfterSessionNotFound = true;
}
}
@@ -558,7 +808,9 @@ public void close() {
}
closed = true;
try {
- delegate.close();
+ if (delegate != null) {
+ delegate.close();
+ }
} finally {
session.close();
}
@@ -569,7 +821,7 @@ public TransactionState getState() {
if (restartedAfterSessionNotFound) {
return TransactionState.ABORTED;
} else {
- return delegate.getState();
+ return delegate == null ? null : delegate.getState();
}
}
}
@@ -580,13 +832,19 @@ public TransactionState getState() {
*/
private static final class SessionPoolTransactionRunner implements TransactionRunner {
private final SessionPool sessionPool;
- private PooledSession session;
+ private PooledSessionFuture session;
private TransactionRunner runner;
- private SessionPoolTransactionRunner(SessionPool sessionPool, PooledSession session) {
+ private SessionPoolTransactionRunner(SessionPool sessionPool, PooledSessionFuture session) {
this.sessionPool = sessionPool;
this.session = session;
- this.runner = session.delegate.readWriteTransaction();
+ }
+
+ private TransactionRunner getRunner() {
+ if (this.runner == null) {
+ this.runner = session.get().readWriteTransaction();
+ }
+ return runner;
}
@Override
@@ -596,17 +854,17 @@ public T run(TransactionCallable callable) {
T result;
while (true) {
try {
- result = runner.run(callable);
+ result = getRunner().run(callable);
break;
} catch (SessionNotFoundException e) {
session = sessionPool.replaceReadWriteSession(e, session);
- runner = session.delegate.readWriteTransaction();
+ runner = session.get().delegate.readWriteTransaction();
}
}
- session.markUsed();
+ session.get().markUsed();
return result;
} catch (SpannerException e) {
- throw session.lastException = e;
+ throw session.get().lastException = e;
} finally {
session.close();
}
@@ -614,19 +872,86 @@ public T run(TransactionCallable callable) {
@Override
public Timestamp getCommitTimestamp() {
- return runner.getCommitTimestamp();
+ return getRunner().getCommitTimestamp();
}
@Override
public TransactionRunner allowNestedTransaction() {
- runner.allowNestedTransaction();
+ getRunner().allowNestedTransaction();
return this;
}
}
+ private static class SessionPoolAsyncRunner implements AsyncRunner {
+ private final SessionPool sessionPool;
+ private volatile PooledSessionFuture session;
+ private final SettableApiFuture commitTimestamp = SettableApiFuture.create();
+
+ private SessionPoolAsyncRunner(SessionPool sessionPool, PooledSessionFuture session) {
+ this.sessionPool = sessionPool;
+ this.session = session;
+ }
+
+ @Override
+ public ApiFuture runAsync(final AsyncWork work, Executor executor) {
+ final SettableApiFuture res = SettableApiFuture.create();
+ executor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ SpannerException se = null;
+ R r = null;
+ AsyncRunner runner = null;
+ while (true) {
+ try {
+ runner = session.get().runAsync();
+ r = runner.runAsync(work, MoreExecutors.directExecutor()).get();
+ break;
+ } catch (ExecutionException e) {
+ se = SpannerExceptionFactory.newSpannerException(e.getCause());
+ } catch (InterruptedException e) {
+ se = SpannerExceptionFactory.propagateInterrupt(e);
+ } catch (Throwable t) {
+ se = SpannerExceptionFactory.newSpannerException(t);
+ } finally {
+ if (se != null && se instanceof SessionNotFoundException) {
+ session =
+ sessionPool.replaceReadWriteSession((SessionNotFoundException) se, session);
+ } else {
+ break;
+ }
+ }
+ }
+ session.get().markUsed();
+ session.close();
+ setCommitTimestamp(runner);
+ if (se != null) {
+ res.setException(se);
+ } else {
+ res.set(r);
+ }
+ }
+ });
+ return res;
+ }
+
+ private void setCommitTimestamp(AsyncRunner delegate) {
+ try {
+ commitTimestamp.set(delegate.getCommitTimestamp().get());
+ } catch (Throwable t) {
+ commitTimestamp.setException(t);
+ }
+ }
+
+ @Override
+ public ApiFuture getCommitTimestamp() {
+ return commitTimestamp;
+ }
+ }
+
// Exception class used just to track the stack trace at the point when a session was handed out
// from the pool.
- private final class LeakedSessionException extends RuntimeException {
+ final class LeakedSessionException extends RuntimeException {
private static final long serialVersionUID = 1451131180314064914L;
private LeakedSessionException() {
@@ -640,25 +965,124 @@ private enum SessionState {
CLOSING,
}
- final class PooledSession implements Session {
- @VisibleForTesting SessionImpl delegate;
- private volatile Instant lastUseTime;
- private volatile SpannerException lastException;
- private volatile LeakedSessionException leakedException;
- private volatile boolean allowReplacing = true;
+ /**
+ * Forwarding future that will return a {@link PooledSession}. If {@link #inProcessPrepare} has
+ * been set to true, the returned session will be prepared with a read/write session using the
+ * thread of the caller to {@link #get()}. This ensures that the executor that is responsible for
+ * background preparing of read/write transactions is not overwhelmed by requests in case of a
+ * large burst of write requests. Instead of filling up the queue of the background executor, the
+ * caller threads will be used for the BeginTransaction call.
+ */
+ private final class ForwardingListenablePooledSessionFuture
+ extends SimpleForwardingListenableFuture {
+ private final boolean inProcessPrepare;
+ private final Span span;
+ private volatile boolean initialized = false;
+ private final Object prepareLock = new Object();
+ private volatile PooledSession result;
+ private volatile SpannerException error;
+
+ private ForwardingListenablePooledSessionFuture(
+ ListenableFuture delegate, boolean inProcessPrepare, Span span) {
+ super(delegate);
+ this.inProcessPrepare = inProcessPrepare;
+ this.span = span;
+ }
- @GuardedBy("lock")
- private SessionState state;
+ @Override
+ public PooledSession get() throws InterruptedException, ExecutionException {
+ try {
+ return initialize(super.get());
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause());
+ } catch (InterruptedException e) {
+ throw SpannerExceptionFactory.propagateInterrupt(e);
+ }
+ }
- private PooledSession(SessionImpl delegate) {
- this.delegate = delegate;
- this.state = SessionState.AVAILABLE;
- this.lastUseTime = clock.instant();
+ @Override
+ public PooledSession get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return initialize(super.get(timeout, unit));
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause());
+ } catch (InterruptedException e) {
+ throw SpannerExceptionFactory.propagateInterrupt(e);
+ } catch (TimeoutException e) {
+ throw SpannerExceptionFactory.propagateTimeout(e);
+ }
}
- @VisibleForTesting
- void setAllowReplacing(boolean allowReplacing) {
- this.allowReplacing = allowReplacing;
+ private PooledSession initialize(PooledSession sess) {
+ if (!initialized) {
+ synchronized (prepareLock) {
+ if (!initialized) {
+ try {
+ result = prepare(sess);
+ } catch (Throwable t) {
+ error = SpannerExceptionFactory.newSpannerException(t);
+ } finally {
+ initialized = true;
+ }
+ }
+ }
+ }
+ if (error != null) {
+ throw error;
+ }
+ return result;
+ }
+
+ private PooledSession prepare(PooledSession sess) {
+ if (inProcessPrepare && !sess.delegate.hasReadyTransaction()) {
+ while (true) {
+ try {
+ sess.prepareReadWriteTransaction();
+ synchronized (lock) {
+ stopAutomaticPrepare = false;
+ }
+ break;
+ } catch (Throwable t) {
+ if (isClosed()) {
+ span.addAnnotation("Pool has been closed");
+ throw new IllegalStateException("Pool has been closed");
+ }
+ SpannerException e = newSpannerException(t);
+ WaiterFuture waiter = new WaiterFuture();
+ synchronized (lock) {
+ handlePrepareSessionFailure(e, sess, false);
+ if (!isSessionNotFound(e)) {
+ throw e;
+ }
+ readWaiters.add(waiter);
+ }
+ sess = waiter.get();
+ if (sess.delegate.hasReadyTransaction()) {
+ break;
+ }
+ }
+ }
+ }
+ return sess;
+ }
+ }
+
+ private PooledSessionFuture createPooledSessionFuture(
+ ListenableFuture future, Span span) {
+ return new PooledSessionFuture(future, span);
+ }
+
+ final class PooledSessionFuture extends SimpleForwardingListenableFuture
+ implements Session {
+ private volatile LeakedSessionException leakedException;
+ private volatile AtomicBoolean inUse = new AtomicBoolean();
+ private volatile CountDownLatch initialized = new CountDownLatch(1);
+ private final Span span;
+
+ private PooledSessionFuture(ListenableFuture delegate, Span span) {
+ super(delegate);
+ this.span = span;
}
@VisibleForTesting
@@ -666,34 +1090,14 @@ void clearLeakedException() {
this.leakedException = null;
}
- private void markBusy() {
- this.state = SessionState.BUSY;
+ private void markCheckedOut() {
this.leakedException = new LeakedSessionException();
}
- private void markClosing() {
- this.state = SessionState.CLOSING;
- }
-
@Override
public Timestamp write(Iterable mutations) throws SpannerException {
try {
- markUsed();
- return delegate.write(mutations);
- } catch (SpannerException e) {
- throw lastException = e;
- } finally {
- close();
- }
- }
-
- @Override
- public long executePartitionedUpdate(Statement stmt) throws SpannerException {
- try {
- markUsed();
- return delegate.executePartitionedUpdate(stmt);
- } catch (SpannerException e) {
- throw lastException = e;
+ return get().write(mutations);
} finally {
close();
}
@@ -702,10 +1106,7 @@ public long executePartitionedUpdate(Statement stmt) throws SpannerException {
@Override
public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
try {
- markUsed();
- return delegate.writeAtLeastOnce(mutations);
- } catch (SpannerException e) {
- throw lastException = e;
+ return get().writeAtLeastOnce(mutations);
} finally {
close();
}
@@ -715,10 +1116,10 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx
public ReadContext singleUse() {
try {
return new AutoClosingReadContext<>(
- new Function() {
+ new Function() {
@Override
- public ReadContext apply(PooledSession session) {
- return session.delegate.singleUse();
+ public ReadContext apply(PooledSessionFuture session) {
+ return session.get().delegate.singleUse();
}
},
SessionPool.this,
@@ -734,10 +1135,10 @@ public ReadContext apply(PooledSession session) {
public ReadContext singleUse(final TimestampBound bound) {
try {
return new AutoClosingReadContext<>(
- new Function() {
+ new Function() {
@Override
- public ReadContext apply(PooledSession session) {
- return session.delegate.singleUse(bound);
+ public ReadContext apply(PooledSessionFuture session) {
+ return session.get().delegate.singleUse(bound);
}
},
SessionPool.this,
@@ -752,10 +1153,10 @@ public ReadContext apply(PooledSession session) {
@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
return internalReadOnlyTransaction(
- new Function() {
+ new Function() {
@Override
- public ReadOnlyTransaction apply(PooledSession session) {
- return session.delegate.singleUseReadOnlyTransaction();
+ public ReadOnlyTransaction apply(PooledSessionFuture session) {
+ return session.get().delegate.singleUseReadOnlyTransaction();
}
},
true);
@@ -764,10 +1165,10 @@ public ReadOnlyTransaction apply(PooledSession session) {
@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bound) {
return internalReadOnlyTransaction(
- new Function() {
+ new Function() {
@Override
- public ReadOnlyTransaction apply(PooledSession session) {
- return session.delegate.singleUseReadOnlyTransaction(bound);
+ public ReadOnlyTransaction apply(PooledSessionFuture session) {
+ return session.get().delegate.singleUseReadOnlyTransaction(bound);
}
},
true);
@@ -776,10 +1177,10 @@ public ReadOnlyTransaction apply(PooledSession session) {
@Override
public ReadOnlyTransaction readOnlyTransaction() {
return internalReadOnlyTransaction(
- new Function() {
+ new Function() {
@Override
- public ReadOnlyTransaction apply(PooledSession session) {
- return session.delegate.readOnlyTransaction();
+ public ReadOnlyTransaction apply(PooledSessionFuture session) {
+ return session.get().delegate.readOnlyTransaction();
}
},
false);
@@ -788,17 +1189,18 @@ public ReadOnlyTransaction apply(PooledSession session) {
@Override
public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) {
return internalReadOnlyTransaction(
- new Function() {
+ new Function() {
@Override
- public ReadOnlyTransaction apply(PooledSession session) {
- return session.delegate.readOnlyTransaction(bound);
+ public ReadOnlyTransaction apply(PooledSessionFuture session) {
+ return session.get().delegate.readOnlyTransaction(bound);
}
},
false);
}
private ReadOnlyTransaction internalReadOnlyTransaction(
- Function transactionSupplier, boolean isSingleUse) {
+ Function transactionSupplier,
+ boolean isSingleUse) {
try {
return new AutoClosingReadTransaction(
transactionSupplier, SessionPool.this, this, isSingleUse);
@@ -813,6 +1215,188 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}
+ @Override
+ public TransactionManager transactionManager() {
+ return new AutoClosingTransactionManager(SessionPool.this, this);
+ }
+
+ @Override
+ public AsyncRunner runAsync() {
+ return new SessionPoolAsyncRunner(SessionPool.this, this);
+ }
+
+ @Override
+ public AsyncTransactionManager transactionManagerAsync() {
+ return new SessionPoolAsyncTransactionManager(this);
+ }
+
+ @Override
+ public long executePartitionedUpdate(Statement stmt) {
+ try {
+ return get().executePartitionedUpdate(stmt);
+ } finally {
+ close();
+ }
+ }
+
+ @Override
+ public String getName() {
+ return get().getName();
+ }
+
+ @Override
+ public void prepareReadWriteTransaction() {
+ get().prepareReadWriteTransaction();
+ }
+
+ @Override
+ public void close() {
+ synchronized (lock) {
+ leakedException = null;
+ checkedOutSessions.remove(this);
+ }
+ get().close();
+ }
+
+ @Override
+ public ApiFuture asyncClose() {
+ synchronized (lock) {
+ leakedException = null;
+ checkedOutSessions.remove(this);
+ }
+ return get().asyncClose();
+ }
+
+ @Override
+ public PooledSession get() {
+ if (inUse.compareAndSet(false, true)) {
+ PooledSession res = null;
+ try {
+ res = super.get();
+ } catch (Throwable e) {
+ // ignore the exception as it will be handled by the call to super.get() below.
+ }
+ if (res != null) {
+ res.markBusy(span);
+ span.addAnnotation(sessionAnnotation(res));
+ synchronized (lock) {
+ incrementNumSessionsInUse();
+ checkedOutSessions.add(this);
+ }
+ }
+ initialized.countDown();
+ }
+ try {
+ initialized.await();
+ return super.get();
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause());
+ } catch (InterruptedException e) {
+ throw SpannerExceptionFactory.propagateInterrupt(e);
+ }
+ }
+ }
+
+ final class PooledSession implements Session {
+ @VisibleForTesting SessionImpl delegate;
+ private volatile Instant lastUseTime;
+ private volatile SpannerException lastException;
+ private volatile boolean allowReplacing = true;
+
+ @GuardedBy("lock")
+ private SessionState state;
+
+ private PooledSession(SessionImpl delegate) {
+ this.delegate = delegate;
+ this.state = SessionState.AVAILABLE;
+ this.lastUseTime = clock.instant();
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @VisibleForTesting
+ void setAllowReplacing(boolean allowReplacing) {
+ this.allowReplacing = allowReplacing;
+ }
+
+ @Override
+ public Timestamp write(Iterable mutations) throws SpannerException {
+ try {
+ markUsed();
+ return delegate.write(mutations);
+ } catch (SpannerException e) {
+ throw lastException = e;
+ }
+ }
+
+ @Override
+ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
+ try {
+ markUsed();
+ return delegate.writeAtLeastOnce(mutations);
+ } catch (SpannerException e) {
+ throw lastException = e;
+ }
+ }
+
+ @Override
+ public long executePartitionedUpdate(Statement stmt) throws SpannerException {
+ try {
+ markUsed();
+ return delegate.executePartitionedUpdate(stmt);
+ } catch (SpannerException e) {
+ throw lastException = e;
+ }
+ }
+
+ @Override
+ public ReadContext singleUse() {
+ return delegate.singleUse();
+ }
+
+ @Override
+ public ReadContext singleUse(TimestampBound bound) {
+ return delegate.singleUse(bound);
+ }
+
+ @Override
+ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
+ return delegate.singleUseReadOnlyTransaction();
+ }
+
+ @Override
+ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
+ return delegate.singleUseReadOnlyTransaction(bound);
+ }
+
+ @Override
+ public ReadOnlyTransaction readOnlyTransaction() {
+ return delegate.readOnlyTransaction();
+ }
+
+ @Override
+ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
+ return delegate.readOnlyTransaction(bound);
+ }
+
+ @Override
+ public TransactionRunner readWriteTransaction() {
+ return delegate.readWriteTransaction();
+ }
+
+ @Override
+ public AsyncRunner runAsync() {
+ return delegate.runAsync();
+ }
+
+ @Override
+ public AsyncTransactionManagerImpl transactionManagerAsync() {
+ return delegate.transactionManagerAsync();
+ }
+
@Override
public ApiFuture asyncClose() {
close();
@@ -825,7 +1409,6 @@ public void close() {
numSessionsInUse--;
numSessionsReleased++;
}
- leakedException = null;
if (lastException != null && isSessionNotFound(lastException)) {
invalidateSession(this);
} else {
@@ -868,59 +1451,56 @@ private void keepAlive() {
}
}
+ private void markBusy(Span span) {
+ this.delegate.setCurrentSpan(span);
+ this.state = SessionState.BUSY;
+ }
+
+ private void markClosing() {
+ this.state = SessionState.CLOSING;
+ }
+
void markUsed() {
lastUseTime = clock.instant();
}
@Override
public TransactionManager transactionManager() {
- return new AutoClosingTransactionManager(SessionPool.this, this);
+ return delegate.transactionManager();
}
}
- private static final class SessionOrError {
- private final PooledSession session;
- private final SpannerException e;
-
- SessionOrError(PooledSession session) {
- this.session = session;
- this.e = null;
- }
+ private final class WaiterFuture extends ForwardingListenableFuture {
+ private static final long MAX_SESSION_WAIT_TIMEOUT = 240_000L;
+ private final SettableFuture waiter = SettableFuture.create();
- SessionOrError(SpannerException e) {
- this.session = null;
- this.e = e;
+ @Override
+ protected ListenableFuture extends PooledSession> delegate() {
+ return waiter;
}
- }
-
- private final class Waiter {
- private static final long MAX_SESSION_WAIT_TIMEOUT = 240_000L;
- private final SynchronousQueue waiter = new SynchronousQueue<>();
private void put(PooledSession session) {
- Uninterruptibles.putUninterruptibly(waiter, new SessionOrError(session));
+ waiter.set(session);
}
private void put(SpannerException e) {
- Uninterruptibles.putUninterruptibly(waiter, new SessionOrError(e));
+ waiter.setException(e);
}
- private PooledSession take() throws SpannerException {
+ @Override
+ public PooledSession get() {
long currentTimeout = options.getInitialWaitForSessionTimeoutMillis();
while (true) {
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
try (Scope waitScope = tracer.withSpan(span)) {
- SessionOrError s = pollUninterruptiblyWithTimeout(currentTimeout);
+ PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
tracer.getCurrentSpan().setStatus(Status.DEADLINE_EXCEEDED);
currentTimeout = Math.min(currentTimeout * 2, MAX_SESSION_WAIT_TIMEOUT);
} else {
- if (s.e != null) {
- throw newSpannerException(s.e);
- }
- return s.session;
+ return s;
}
} catch (Exception e) {
TraceUtil.setWithFailure(span, e);
@@ -931,14 +1511,18 @@ private PooledSession take() throws SpannerException {
}
}
- private SessionOrError pollUninterruptiblyWithTimeout(long timeoutMillis) {
+ private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
boolean interrupted = false;
try {
while (true) {
try {
- return waiter.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
+ } catch (TimeoutException e) {
+ return null;
+ } catch (ExecutionException e) {
+ throw SpannerExceptionFactory.newSpannerException(e.getCause());
}
}
} finally {
@@ -1118,6 +1702,7 @@ private static enum Position {
private final ScheduledExecutorService executor;
private final ExecutorFactory executorFactory;
private final ScheduledExecutorService prepareExecutor;
+
private final int prepareThreadPoolSize;
final PoolMaintainer poolMaintainer;
private final Clock clock;
@@ -1146,10 +1731,10 @@ private static enum Position {
private final LinkedList writePreparedSessions = new LinkedList<>();
@GuardedBy("lock")
- private final Queue readWaiters = new LinkedList<>();
+ private final Queue readWaiters = new LinkedList<>();
@GuardedBy("lock")
- private final Queue readWriteWaiters = new LinkedList<>();
+ private final Queue readWriteWaiters = new LinkedList<>();
@GuardedBy("lock")
private int numSessionsBeingPrepared = 0;
@@ -1183,6 +1768,9 @@ private static enum Position {
@GuardedBy("lock")
private final Set allSessions = new HashSet<>();
+ @GuardedBy("lock")
+ private final Set checkedOutSessions = new HashSet<>();
+
private final SessionConsumer sessionConsumer = new SessionConsumerImpl();
@VisibleForTesting Function idleSessionRemovedListener;
@@ -1275,6 +1863,12 @@ private SessionPool(
}
@VisibleForTesting
+ int getNumberOfSessionsInUse() {
+ synchronized (lock) {
+ return numSessionsInUse;
+ }
+ }
+
long getNumberOfSessionsInProcessPrepared() {
synchronized (lock) {
return numSessionsInProcessPrepared;
@@ -1297,9 +1891,9 @@ void removeFromPool(PooledSession session) {
session.markClosing();
allSessions.remove(session);
numIdleSessionsRemoved++;
- if (idleSessionRemovedListener != null) {
- idleSessionRemovedListener.apply(session);
- }
+ }
+ if (idleSessionRemovedListener != null) {
+ idleSessionRemovedListener.apply(session);
}
}
@@ -1437,10 +2031,10 @@ boolean isValid() {
* session being returned to the pool or a new session being created.
*
*/
- PooledSession getReadSession() throws SpannerException {
+ PooledSessionFuture getReadSession() throws SpannerException {
Span span = Tracing.getTracer().getCurrentSpan();
span.addAnnotation("Acquiring session");
- Waiter waiter = null;
+ WaiterFuture waiter = null;
PooledSession sess = null;
synchronized (lock) {
if (closureFuture != null) {
@@ -1462,7 +2056,7 @@ PooledSession getReadSession() throws SpannerException {
if (sess == null) {
span.addAnnotation("No session available");
maybeCreateSession();
- waiter = new Waiter();
+ waiter = new WaiterFuture();
readWaiters.add(waiter);
} else {
span.addAnnotation("Acquired read write session");
@@ -1470,18 +2064,8 @@ PooledSession getReadSession() throws SpannerException {
} else {
span.addAnnotation("Acquired read only session");
}
+ return checkoutSession(span, sess, waiter, false, false);
}
- if (waiter != null) {
- logger.log(
- Level.FINE,
- "No session available in the pool. Blocking for one to become available/created");
- span.addAnnotation("Waiting for read only session to be available");
- sess = waiter.take();
- }
- sess.markBusy();
- incrementNumSessionsInUse();
- span.addAnnotation(sessionAnnotation(sess));
- return sess;
}
/**
@@ -1502,129 +2086,123 @@ PooledSession getReadSession() throws SpannerException {
* to the pool which is then write prepared.
*
*/
- PooledSession getReadWriteSession() {
+ PooledSessionFuture getReadWriteSession() {
Span span = Tracing.getTracer().getCurrentSpan();
span.addAnnotation("Acquiring read write session");
PooledSession sess = null;
- // Loop to retry SessionNotFoundExceptions that might occur during in-process prepare of a
- // session.
- while (true) {
- Waiter waiter = null;
- boolean inProcessPrepare = stopAutomaticPrepare;
- synchronized (lock) {
- if (closureFuture != null) {
- span.addAnnotation("Pool has been closed");
- throw new IllegalStateException("Pool has been closed", closedException);
- }
- if (resourceNotFoundException != null) {
- span.addAnnotation("Database has been deleted");
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.NOT_FOUND,
- String.format(
- "The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
- resourceNotFoundException.getMessage()),
- resourceNotFoundException);
- }
- sess = writePreparedSessions.poll();
- if (sess == null) {
- if (!inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize) {
- if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
- PooledSession readSession = readSessions.poll();
- if (readSession != null) {
- span.addAnnotation(
- "Acquired read only session. Preparing for read write transaction");
- prepareSession(readSession);
- } else {
- span.addAnnotation("No session available");
- maybeCreateSession();
- }
- }
- } else {
- inProcessPrepare = true;
- numSessionsInProcessPrepared++;
+ WaiterFuture waiter = null;
+ boolean inProcessPrepare = stopAutomaticPrepare;
+ synchronized (lock) {
+ if (closureFuture != null) {
+ span.addAnnotation("Pool has been closed");
+ throw new IllegalStateException("Pool has been closed", closedException);
+ }
+ if (resourceNotFoundException != null) {
+ span.addAnnotation("Database has been deleted");
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.NOT_FOUND,
+ String.format(
+ "The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
+ resourceNotFoundException.getMessage()),
+ resourceNotFoundException);
+ }
+ sess = writePreparedSessions.poll();
+ if (sess == null) {
+ if (!inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize) {
+ if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
PooledSession readSession = readSessions.poll();
if (readSession != null) {
- // Create a read/write transaction in-process if there is already a queue for prepared
- // sessions. This is more efficient than doing it asynchronously, as it scales with
- // the number of user threads. The thread pool for asynchronously preparing sessions
- // is fixed.
span.addAnnotation(
- "Acquired read only session. Preparing in-process for read write transaction");
- sess = readSession;
+ "Acquired read only session. Preparing for read write transaction");
+ prepareSession(readSession);
} else {
span.addAnnotation("No session available");
maybeCreateSession();
}
}
- if (sess == null) {
- waiter = new Waiter();
- if (inProcessPrepare) {
- // inProcessPrepare=true means that we have already determined that the queue for
- // preparing read/write sessions is larger than the number of threads in the prepare
- // thread pool, and that it's more efficient to do the prepare in-process. We will
- // therefore create a waiter for a read-only session, even though a read/write session
- // has been requested.
- readWaiters.add(waiter);
- } else {
- readWriteWaiters.add(waiter);
- }
- }
} else {
- span.addAnnotation("Acquired read write session");
- }
- }
- if (waiter != null) {
- logger.log(
- Level.FINE,
- "No session available in the pool. Blocking for one to become available/created");
- span.addAnnotation("Waiting for read write session to be available");
- sess = waiter.take();
- }
- if (inProcessPrepare) {
- try {
- sess.prepareReadWriteTransaction();
- // Session prepare succeeded, restart automatic prepare if it had been stopped.
- synchronized (lock) {
- stopAutomaticPrepare = false;
- }
- } catch (Throwable t) {
- SpannerException e = newSpannerException(t);
- if (!isClosed()) {
- handlePrepareSessionFailure(e, sess, false);
+ inProcessPrepare = true;
+ numSessionsInProcessPrepared++;
+ PooledSession readSession = readSessions.poll();
+ if (readSession != null) {
+ // Create a read/write transaction in-process if there is already a queue for prepared
+ // sessions. This is more efficient than doing it asynchronously, as it scales with
+ // the number of user threads. The thread pool for asynchronously preparing sessions
+ // is fixed.
+ span.addAnnotation(
+ "Acquired read only session. Preparing in-process for read write transaction");
+ sess = readSession;
+ } else {
+ span.addAnnotation("No session available");
+ maybeCreateSession();
}
- sess = null;
- if (!isSessionNotFound(e)) {
- throw e;
+ }
+ if (sess == null) {
+ waiter = new WaiterFuture();
+ if (inProcessPrepare) {
+ // inProcessPrepare=true means that we have already determined that the queue for
+ // preparing read/write sessions is larger than the number of threads in the prepare
+ // thread pool, and that it's more efficient to do the prepare in-process. We will
+ // therefore create a waiter for a read-only session, even though a read/write session
+ // has been requested.
+ readWaiters.add(waiter);
+ } else {
+ readWriteWaiters.add(waiter);
}
}
+ } else {
+ span.addAnnotation("Acquired read write session");
}
- if (sess != null) {
- break;
- }
+ return checkoutSession(span, sess, waiter, true, inProcessPrepare);
}
- sess.markBusy();
- incrementNumSessionsInUse();
- span.addAnnotation(sessionAnnotation(sess));
- return sess;
}
- PooledSession replaceReadSession(SessionNotFoundException e, PooledSession session) {
+ private PooledSessionFuture checkoutSession(
+ final Span span,
+ final PooledSession readySession,
+ WaiterFuture waiter,
+ boolean write,
+ final boolean inProcessPrepare) {
+ ListenableFuture sessionFuture;
+ if (waiter != null) {
+ logger.log(
+ Level.FINE,
+ "No session available in the pool. Blocking for one to become available/created");
+ span.addAnnotation(
+ String.format(
+ "Waiting for %s session to be available", write ? "read write" : "read only"));
+ sessionFuture = waiter;
+ } else {
+ SettableFuture fut = SettableFuture.create();
+ fut.set(readySession);
+ sessionFuture = fut;
+ }
+ ForwardingListenablePooledSessionFuture forwardingFuture =
+ new ForwardingListenablePooledSessionFuture(sessionFuture, inProcessPrepare, span);
+ PooledSessionFuture res = createPooledSessionFuture(forwardingFuture, span);
+ res.markCheckedOut();
+ return res;
+ }
+
+ PooledSessionFuture replaceReadSession(SessionNotFoundException e, PooledSessionFuture session) {
return replaceSession(e, session, false);
}
- PooledSession replaceReadWriteSession(SessionNotFoundException e, PooledSession session) {
+ PooledSessionFuture replaceReadWriteSession(
+ SessionNotFoundException e, PooledSessionFuture session) {
return replaceSession(e, session, true);
}
- private PooledSession replaceSession(
- SessionNotFoundException e, PooledSession session, boolean write) {
- if (!options.isFailIfSessionNotFound() && session.allowReplacing) {
+ private PooledSessionFuture replaceSession(
+ SessionNotFoundException e, PooledSessionFuture session, boolean write) {
+ if (!options.isFailIfSessionNotFound() && session.get().allowReplacing) {
synchronized (lock) {
numSessionsInUse--;
numSessionsReleased++;
+ checkedOutSessions.remove(session);
}
session.leakedException = null;
- invalidateSession(session);
+ invalidateSession(session.get());
return write ? getReadWriteSession() : getReadSession();
} else {
throw e;
@@ -1787,7 +2365,7 @@ ListenableFuture closeAsync(ClosedException closedException) {
}
this.closedException = closedException;
// Fail all pending waiters.
- Waiter waiter = readWaiters.poll();
+ WaiterFuture waiter = readWaiters.poll();
while (waiter != null) {
waiter.put(newSpannerException(ErrorCode.INTERNAL, "Client has been closed"));
waiter = readWaiters.poll();
@@ -1821,10 +2399,16 @@ public void run() {
}
}
});
- for (final PooledSession session : ImmutableList.copyOf(allSessions)) {
+ for (PooledSessionFuture session : checkedOutSessions) {
if (session.leakedException != null) {
- logger.log(Level.WARNING, "Leaked session", session.leakedException);
+ if (options.isFailOnSessionLeak()) {
+ throw session.leakedException;
+ } else {
+ logger.log(Level.WARNING, "Leaked session", session.leakedException);
+ }
}
+ }
+ for (final PooledSession session : ImmutableList.copyOf(allSessions)) {
if (session.state != SessionState.CLOSING) {
closeSessionAsync(session);
}
@@ -1894,7 +2478,7 @@ public void run() {
}
}
},
- executor);
+ MoreExecutors.directExecutor());
return res;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java
new file mode 100644
index 0000000000..55b6102a27
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.ApiAsyncFunction;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
+import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
+import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
+import com.google.cloud.spanner.TransactionManager.TransactionState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import javax.annotation.concurrent.GuardedBy;
+
+class SessionPoolAsyncTransactionManager implements CommittableAsyncTransactionManager {
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private TransactionState txnState;
+
+ private volatile PooledSessionFuture session;
+ private final SettableApiFuture delegate =
+ SettableApiFuture.create();
+
+ SessionPoolAsyncTransactionManager(PooledSessionFuture session) {
+ this.session = session;
+ this.session.addListener(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ delegate.set(
+ SessionPoolAsyncTransactionManager.this.session.get().transactionManagerAsync());
+ } catch (Throwable t) {
+ delegate.setException(t);
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public void close() {
+ delegate.addListener(
+ new Runnable() {
+ @Override
+ public void run() {
+ session.close();
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public TransactionContextFuture beginAsync() {
+ synchronized (lock) {
+ Preconditions.checkState(txnState == null, "begin can only be called once");
+ txnState = TransactionState.STARTED;
+ }
+ final SettableApiFuture delegateTxnFuture = SettableApiFuture.create();
+ ApiFutures.addCallback(
+ delegate,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ delegateTxnFuture.setException(t);
+ }
+
+ @Override
+ public void onSuccess(AsyncTransactionManagerImpl result) {
+ ApiFutures.addCallback(
+ result.beginAsync(),
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ delegateTxnFuture.setException(t);
+ }
+
+ @Override
+ public void onSuccess(TransactionContext result) {
+ delegateTxnFuture.set(result);
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+ },
+ MoreExecutors.directExecutor());
+ return new TransactionContextFutureImpl(this, delegateTxnFuture);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (t instanceof AbortedException) {
+ synchronized (lock) {
+ txnState = TransactionState.ABORTED;
+ }
+ }
+ }
+
+ @Override
+ public ApiFuture commitAsync() {
+ synchronized (lock) {
+ Preconditions.checkState(
+ txnState == TransactionState.STARTED,
+ "commit can only be invoked if the transaction is in progress. Current state: "
+ + txnState);
+ txnState = TransactionState.COMMITTED;
+ }
+ return ApiFutures.transformAsync(
+ delegate,
+ new ApiAsyncFunction() {
+ @Override
+ public ApiFuture apply(AsyncTransactionManagerImpl input) throws Exception {
+ final SettableApiFuture res = SettableApiFuture.create();
+ ApiFutures.addCallback(
+ input.commitAsync(),
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ synchronized (lock) {
+ if (t instanceof AbortedException) {
+ txnState = TransactionState.ABORTED;
+ } else {
+ txnState = TransactionState.COMMIT_FAILED;
+ }
+ }
+ res.setException(t);
+ }
+
+ @Override
+ public void onSuccess(Timestamp result) {
+ res.set(result);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return res;
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public ApiFuture rollbackAsync() {
+ synchronized (lock) {
+ Preconditions.checkState(
+ txnState == TransactionState.STARTED,
+ "rollback can only be called if the transaction is in progress");
+ txnState = TransactionState.ROLLED_BACK;
+ }
+ return ApiFutures.transformAsync(
+ delegate,
+ new ApiAsyncFunction() {
+ @Override
+ public ApiFuture apply(AsyncTransactionManagerImpl input) throws Exception {
+ ApiFuture res = input.rollbackAsync();
+ res.addListener(
+ new Runnable() {
+ @Override
+ public void run() {
+ session.close();
+ }
+ },
+ MoreExecutors.directExecutor());
+ return res;
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public TransactionContextFuture resetForRetryAsync() {
+ synchronized (lock) {
+ Preconditions.checkState(
+ txnState == TransactionState.ABORTED,
+ "resetForRetry can only be called after the transaction aborted.");
+ txnState = TransactionState.STARTED;
+ }
+ return new TransactionContextFutureImpl(
+ this,
+ ApiFutures.transformAsync(
+ delegate,
+ new ApiAsyncFunction() {
+ @Override
+ public ApiFuture apply(AsyncTransactionManagerImpl input)
+ throws Exception {
+ return input.resetForRetryAsync();
+ }
+ },
+ MoreExecutors.directExecutor()));
+ }
+
+ @Override
+ public TransactionState getState() {
+ synchronized (lock) {
+ return txnState;
+ }
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
index 17295a38ab..57dbd4debd 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
@@ -37,6 +37,7 @@ public class SessionPoolOptions {
private final int keepAliveIntervalMinutes;
private final Duration removeInactiveSessionAfter;
private final ActionOnSessionNotFound actionOnSessionNotFound;
+ private final ActionOnSessionLeak actionOnSessionLeak;
private final long initialWaitForSessionTimeoutMillis;
private SessionPoolOptions(Builder builder) {
@@ -50,6 +51,7 @@ private SessionPoolOptions(Builder builder) {
this.writeSessionsFraction = builder.writeSessionsFraction;
this.actionOnExhaustion = builder.actionOnExhaustion;
this.actionOnSessionNotFound = builder.actionOnSessionNotFound;
+ this.actionOnSessionLeak = builder.actionOnSessionLeak;
this.initialWaitForSessionTimeoutMillis = builder.initialWaitForSessionTimeoutMillis;
this.loopFrequency = builder.loopFrequency;
this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes;
@@ -106,6 +108,11 @@ boolean isFailIfSessionNotFound() {
return actionOnSessionNotFound == ActionOnSessionNotFound.FAIL;
}
+ @VisibleForTesting
+ boolean isFailOnSessionLeak() {
+ return actionOnSessionLeak == ActionOnSessionLeak.FAIL;
+ }
+
public static Builder newBuilder() {
return new Builder();
}
@@ -120,6 +127,11 @@ private static enum ActionOnSessionNotFound {
FAIL;
}
+ private static enum ActionOnSessionLeak {
+ WARN,
+ FAIL;
+ }
+
/** Builder for creating SessionPoolOptions. */
public static class Builder {
private boolean minSessionsSet = false;
@@ -131,6 +143,7 @@ public static class Builder {
private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION;
private long initialWaitForSessionTimeoutMillis = 30_000L;
private ActionOnSessionNotFound actionOnSessionNotFound = ActionOnSessionNotFound.RETRY;
+ private ActionOnSessionLeak actionOnSessionLeak = ActionOnSessionLeak.WARN;
private long loopFrequency = 10 * 1000L;
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
@@ -240,6 +253,12 @@ Builder setFailIfSessionNotFound() {
return this;
}
+ @VisibleForTesting
+ Builder setFailOnSessionLeak() {
+ this.actionOnSessionLeak = ActionOnSessionLeak.FAIL;
+ return this;
+ }
+
/**
* Fraction of sessions to be kept prepared for write transactions. This is an optimisation to
* avoid the cost of sending a BeginTransaction() rpc. If all such sessions are in use and a
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
index 0c6bec4ea8..52c35cb713 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Service;
/**
@@ -108,4 +109,7 @@ public interface Spanner extends Service, AutoCloseable {
/** @return true
if this {@link Spanner} object is closed. */
boolean isClosed();
+
+ /** @return the {@link ExecutorProvider} that is used for asynchronous queries and operations. */
+ ExecutorProvider getAsyncExecutorProvider();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
index 4e937459cf..2d034eda88 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.paging.Page;
import com.google.cloud.BaseService;
@@ -23,9 +24,11 @@
import com.google.cloud.PageImpl.NextPageFetcher;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SessionClient.SessionId;
+import com.google.cloud.spanner.SpannerOptions.CloseableExecutorProvider;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@@ -86,6 +89,8 @@ private static String nextDatabaseClientId(DatabaseId databaseId) {
@GuardedBy("this")
private final Map dbClients = new HashMap<>();
+ private final CloseableExecutorProvider asyncExecutorProvider;
+
@GuardedBy("this")
private final List invalidatedDbClients = new ArrayList<>();
@@ -116,6 +121,10 @@ static final class ClosedException extends RuntimeException {
SpannerImpl(SpannerRpc gapicRpc, SpannerOptions options) {
super(options);
this.gapicRpc = gapicRpc;
+ this.asyncExecutorProvider =
+ MoreObjects.firstNonNull(
+ options.getAsyncExecutorProvider(),
+ SpannerOptions.createDefaultAsyncExecutorProvider());
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc);
this.instanceClient =
new InstanceAdminClientImpl(options.getProjectId(), gapicRpc, dbAdminClient);
@@ -140,6 +149,13 @@ QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
return getOptions().getDefaultQueryOptions(databaseId);
}
+ /**
+ * Returns the {@link ExecutorProvider} to use for async methods that need a background executor.
+ */
+ public ExecutorProvider getAsyncExecutorProvider() {
+ return asyncExecutorProvider;
+ }
+
SessionImpl sessionWithId(String name) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "name is null or empty");
SessionId id = SessionId.of(name);
@@ -251,6 +267,7 @@ void close(long timeout, TimeUnit unit) {
sessionClient.close();
}
sessionClients.clear();
+ asyncExecutorProvider.close();
try {
gapicRpc.shutdown();
} catch (RuntimeException e) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index edeadb7b90..35a288530f 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -17,6 +17,7 @@
package com.google.cloud.spanner;
import com.google.api.core.ApiFunction;
+import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
@@ -40,10 +41,12 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.SpannerSettings;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.admin.database.v1.CreateBackupRequest;
import com.google.spanner.admin.database.v1.CreateDatabaseRequest;
import com.google.spanner.admin.database.v1.RestoreDatabaseRequest;
@@ -59,6 +62,11 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
@@ -107,6 +115,7 @@ public class SpannerOptions extends ServiceOptions