diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index cff6e4b76d..5680c0706e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -58,6 +58,18 @@ */ abstract class AbstractReadContext implements ReadContext, AbstractResultSet.Listener, SessionTransaction { + + /** + * {@link AsyncResultSet} that supports adding listeners that are called when all rows from the + * underlying result stream have been fetched. + */ + interface ListenableAsyncResultSet extends AsyncResultSet { + /** Adds a listener to this {@link AsyncResultSet}. */ + void addListener(Runnable listener); + + void removeListener(Runnable listener); + } + /** * A {@code ReadContext} for standalone reads. This can only be used for a single operation, since * each standalone read may see a different timestamp of Cloud Spanner data. @@ -317,10 +329,15 @@ public final ResultSet read( } @Override - public final AsyncResultSet readAsync( + public ListenableAsyncResultSet readAsync( String table, KeySet keys, Iterable columns, ReadOption... options) { + Options readOptions = Options.fromReadOptions(options); + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( - executorProvider, readInternal(table, null, keys, columns, options)); + executorProvider, readInternal(table, null, keys, columns, options), bufferRows); } @Override @@ -330,10 +347,17 @@ public final ResultSet readUsingIndex( } @Override - public final AsyncResultSet readUsingIndexAsync( + public ListenableAsyncResultSet readUsingIndexAsync( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { + Options readOptions = Options.fromReadOptions(options); + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( - executorProvider, readInternal(table, checkNotNull(index), keys, columns, options)); + executorProvider, + readInternal(table, checkNotNull(index), keys, columns, options), + bufferRows); } @Nullable @@ -376,11 +400,17 @@ public final ResultSet executeQuery(Statement statement, QueryOption... options) } @Override - public final AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { + public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { + Options readOptions = Options.fromQueryOptions(options); + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( executorProvider, executeQueryInternal( - statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options)); + statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), + bufferRows); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 21f9094b24..82ed5aab98 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -19,11 +19,14 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.ExecutorProvider; +import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.spanner.v1.ResultSetStats; +import java.util.Collection; +import java.util.LinkedList; import java.util.concurrent.BlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -34,7 +37,8 @@ import java.util.concurrent.ScheduledExecutorService; /** Default implementation for {@link AsyncResultSet}. */ -class AsyncResultSetImpl extends ForwardingStructReader implements AsyncResultSet { +class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet { + /** State of an {@link AsyncResultSetImpl}. */ private enum State { INITIALIZED, @@ -58,7 +62,7 @@ private State(boolean shouldStop) { } } - private static final int DEFAULT_BUFFER_SIZE = 10; + static final int DEFAULT_BUFFER_SIZE = 10; private static final int MAX_WAIT_FOR_BUFFER_CONSUMPTION = 10; private final Object monitor = new Object(); @@ -90,6 +94,12 @@ private State(boolean shouldStop) { private ReadyCallback callback; + /** + * Listeners that will be called when the {@link AsyncResultSetImpl} has finished fetching all + * rows and any underlying transaction or session can be closed. + */ + private Collection listeners = new LinkedList<>(); + private State state = State.INITIALIZED; /** @@ -122,10 +132,6 @@ private State(boolean shouldStop) { */ private volatile CountDownLatch consumingLatch = new CountDownLatch(0); - AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate) { - this(executorProvider, delegate, DEFAULT_BUFFER_SIZE); - } - AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) { super(delegate); this.buffer = new LinkedBlockingDeque<>(bufferSize); @@ -155,11 +161,21 @@ public void close() { } /** - * Called when no more rows will be read from the underlying {@link ResultSet}, either because all - * rows have been read, or because {@link ReadyCallback#cursorReady(AsyncResultSet)} returned - * {@link CallbackResponse#DONE}. + * Adds a listener that will be called when no more rows will be read from the underlying {@link + * ResultSet}, either because all rows have been read, or because {@link + * ReadyCallback#cursorReady(AsyncResultSet)} returned {@link CallbackResponse#DONE}. */ - void onFinished() {} + @Override + public void addListener(Runnable listener) { + Preconditions.checkState(state == State.INITIALIZED); + listeners.add(listener); + } + + @Override + public void removeListener(Runnable listener) { + Preconditions.checkState(state == State.INITIALIZED); + listeners.remove(listener); + } /** * Tries to advance this {@link AsyncResultSet} to the next row. This method may only be called @@ -339,7 +355,9 @@ public Void call() throws Exception { delegateResultSet.close(); } catch (Throwable t) { } finally { - onFinished(); + for (Runnable listener : listeners) { + listener.run(); + } } // Ensure that the callback has been called at least once, even if the result set was diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java new file mode 100644 index 0000000000..c5535bc449 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.core.ApiFuture; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.util.concurrent.Executor; + +/** Forwarding implementation of {@link AsyncResultSet} that forwards all calls to a delegate. */ +public class ForwardingAsyncResultSet extends ForwardingResultSet implements AsyncResultSet { + final AsyncResultSet delegate; + + public ForwardingAsyncResultSet(AsyncResultSet delegate) { + super(Preconditions.checkNotNull(delegate)); + this.delegate = delegate; + } + + @Override + public CursorState tryNext() throws SpannerException { + return delegate.tryNext(); + } + + @Override + public void setCallback(Executor exec, ReadyCallback cb) { + delegate.setCallback(exec, cb); + ; + } + + @Override + public void cancel() { + delegate.cancel(); + } + + @Override + public void resume() { + delegate.resume(); + } + + @Override + public ApiFuture> toListAsync( + Function transformer, Executor executor) { + return delegate.toListAsync(transformer, executor); + } + + @Override + public ImmutableList toList(Function transformer) + throws SpannerException { + return delegate.toList(transformer); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index c3b32351ad..a0942cc19f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -81,20 +81,17 @@ import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -151,6 +148,11 @@ public ResultSet get() { * finished, if it is a single use context. */ private static class AutoClosingReadContext implements ReadContext { + /** + * {@link AsyncResultSet} implementation that keeps track of the async operations that are still + * running for this {@link ReadContext} and that should finish before the {@link ReadContext} + * releases its session back into the pool. + */ private class AutoClosingReadContextAsyncResultSetImpl extends AsyncResultSetImpl { private AutoClosingReadContextAsyncResultSetImpl( ExecutorProvider executorProvider, ResultSet delegate, int bufferRows) { @@ -159,19 +161,28 @@ private AutoClosingReadContextAsyncResultSetImpl( @Override public void setCallback(Executor exec, ReadyCallback cb) { - asyncOperationsCount.incrementAndGet(); - super.setCallback(exec, cb); - } - - @Override - void onFinished() { - synchronized (lock) { - if (asyncOperationsCount.decrementAndGet() == 0) { - if (closed) { - // All async operations for this read context have finished. - AutoClosingReadContext.this.close(); - } - } + Runnable listener = + new Runnable() { + @Override + public void run() { + synchronized (lock) { + if (asyncOperationsCount.decrementAndGet() == 0) { + if (closed) { + // All async operations for this read context have finished. + AutoClosingReadContext.this.close(); + } + } + } + } + }; + try { + asyncOperationsCount.incrementAndGet(); + addListener(listener); + super.setCallback(exec, cb); + } catch (Throwable t) { + removeListener(listener); + asyncOperationsCount.decrementAndGet(); + throw t; } } } @@ -321,7 +332,10 @@ public AsyncResultSet readAsync( final Iterable columns, final ReadOption... options) { Options readOptions = Options.fromReadOptions(options); - final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10; + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AutoClosingReadContextAsyncResultSetImpl( sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(), wrap( @@ -358,7 +372,10 @@ public AsyncResultSet readUsingIndexAsync( final Iterable columns, final ReadOption... options) { Options readOptions = Options.fromReadOptions(options); - final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10; + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AutoClosingReadContextAsyncResultSetImpl( sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(), wrap( @@ -454,7 +471,10 @@ ResultSet load() { public AsyncResultSet executeQueryAsync( final Statement statement, final QueryOption... options) { Options queryOptions = Options.fromQueryOptions(options); - final int bufferRows = queryOptions.hasBufferRows() ? queryOptions.bufferRows() : 10; + final int bufferRows = + queryOptions.hasBufferRows() + ? queryOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AutoClosingReadContextAsyncResultSetImpl( sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(), wrap( @@ -1546,6 +1566,9 @@ private static enum Position { private final ScheduledExecutorService executor; private final ExecutorFactory executorFactory; private final ScheduledExecutorService prepareExecutor; + + // TODO(loite): Refactor Waiter to use a SettableFuture that can be set when a session is released + // into the pool, instead of using a thread waiting on a synchronous queue. private final ScheduledExecutorService readWaiterExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -1558,6 +1581,7 @@ private static enum Position { .setDaemon(true) .setNameFormat("session-pool-write-waiter-%d") .build()); + final PoolMaintainer poolMaintainer; private final Clock clock; private final Object lock = new Object(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index f1db46859a..7949ed2a69 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -42,7 +42,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 04e62d905e..f95fbfcec8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -26,6 +26,8 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.core.ExecutorProvider; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.QueryOption; +import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; @@ -49,7 +51,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,6 +67,54 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { + /** + * {@link AsyncResultSet} implementation that keeps track of the async operations that are still + * running for this {@link TransactionContext} and that should finish before the {@link + * TransactionContext} can commit and release its session back into the pool. + */ + private class TransactionContextAsyncResultSetImpl extends ForwardingAsyncResultSet + implements ListenableAsyncResultSet { + private TransactionContextAsyncResultSetImpl(ListenableAsyncResultSet delegate) { + super(delegate); + } + + @Override + public void setCallback(Executor exec, ReadyCallback cb) { + Runnable listener = + new Runnable() { + @Override + public void run() { + finishedAsyncOperations.countDown(); + } + }; + try { + increaseAsynOperations(); + addListener(listener); + super.setCallback(exec, cb); + } catch (Throwable t) { + removeListener(listener); + finishedAsyncOperations.countDown(); + throw t; + } + } + + @Override + public void addListener(Runnable listener) { + ((ListenableAsyncResultSet) this.delegate).addListener(listener); + } + + @Override + public void removeListener(Runnable listener) { + ((ListenableAsyncResultSet) this.delegate).removeListener(listener); + } + } + + @GuardedBy("lock") + private volatile boolean committing; + + @GuardedBy("lock") + private volatile CountDownLatch finishedAsyncOperations = new CountDownLatch(0); + @GuardedBy("lock") private List mutations = new ArrayList<>(); @@ -88,6 +140,12 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans this.span = span; } + private void increaseAsynOperations() { + synchronized (lock) { + finishedAsyncOperations = new CountDownLatch((int) finishedAsyncOperations.getCount() + 1); + } + } + void ensureTxn() { if (transactionId == null || isAborted()) { span.addAnnotation("Creating Transaction"); @@ -118,6 +176,15 @@ void ensureTxn() { } void commit() { + CountDownLatch latch; + synchronized (lock) { + latch = finishedAsyncOperations; + } + try { + latch.await(); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } span.addAnnotation("Starting Commit"); CommitRequest.Builder builder = CommitRequest.newBuilder().setSession(session.getName()).setTransactionId(transactionId); @@ -251,8 +318,16 @@ public ApiFuture executeUpdateAsync(Statement statement) { beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL); - ApiFuture resultSet = - rpc.executeQueryAsync(builder.build(), session.getOptions()); + ApiFuture resultSet; + try { + // Register the update as an async operation that must finish before the transaction may + // commit. + increaseAsynOperations(); + resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions()); + } catch (Throwable t) { + finishedAsyncOperations.countDown(); + throw t; + } final ApiFuture updateCount = ApiFutures.transform( resultSet, @@ -282,6 +357,8 @@ public void run() { onError(SpannerExceptionFactory.newSpannerException(e.getCause())); } catch (InterruptedException e) { onError(SpannerExceptionFactory.propagateInterrupt(e)); + } finally { + finishedAsyncOperations.countDown(); } } }, @@ -318,6 +395,28 @@ public long[] batchUpdate(Iterable statements) { throw e; } } + + private ListenableAsyncResultSet wrap(ListenableAsyncResultSet delegate) { + return new TransactionContextAsyncResultSetImpl(delegate); + } + + @Override + public ListenableAsyncResultSet readAsync( + String table, KeySet keys, Iterable columns, ReadOption... options) { + return wrap(super.readAsync(table, keys, columns, options)); + } + + @Override + public ListenableAsyncResultSet readUsingIndexAsync( + String table, String index, KeySet keys, Iterable columns, ReadOption... options) { + return wrap(super.readUsingIndexAsync(table, index, keys, columns, options)); + } + + @Override + public ListenableAsyncResultSet executeQueryAsync( + final Statement statement, final QueryOption... options) { + return wrap(super.executeQueryAsync(statement, options)); + } } private boolean blockNestedTxn = true; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index cd5588187e..9359dc6694 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -60,7 +60,9 @@ public void setup() { @SuppressWarnings("unchecked") @Test public void close() { - AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, mock(ResultSet.class)); + AsyncResultSetImpl rs = + new AsyncResultSetImpl( + mockedProvider, mock(ResultSet.class), AsyncResultSetImpl.DEFAULT_BUFFER_SIZE); rs.close(); // Closing a second time should be a no-op. rs.close(); @@ -83,7 +85,9 @@ public void close() { } // The following methods are allowed on a closed result set. - AsyncResultSetImpl rs2 = new AsyncResultSetImpl(mockedProvider, mock(ResultSet.class)); + AsyncResultSetImpl rs2 = + new AsyncResultSetImpl( + mockedProvider, mock(ResultSet.class), AsyncResultSetImpl.DEFAULT_BUFFER_SIZE); rs2.setCallback(mock(Executor.class), mock(ReadyCallback.class)); rs2.close(); rs2.cancel(); @@ -92,7 +96,9 @@ public void close() { @Test public void tryNextNotAllowed() { - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, mock(ResultSet.class))) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl( + mockedProvider, mock(ResultSet.class), AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback(mock(Executor.class), mock(ReadyCallback.class)); try { rs.tryNext(); @@ -109,7 +115,8 @@ public void toList() { ResultSet delegate = mock(ResultSet.class); when(delegate.next()).thenReturn(true, true, true, false); when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class)); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { ImmutableList list = rs.toList( new Function() { @@ -129,7 +136,8 @@ public void toListPropagatesError() { .thenThrow( SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, "invalid query")); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.toList( new Function() { @Override @@ -150,7 +158,8 @@ public void toListAsync() throws InterruptedException, ExecutionException { ResultSet delegate = mock(ResultSet.class); when(delegate.next()).thenReturn(true, true, true, false); when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class)); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { ApiFuture> future = rs.toListAsync( new Function() { @@ -173,7 +182,8 @@ public void toListAsyncPropagatesError() throws InterruptedException { .thenThrow( SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, "invalid query")); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.toListAsync( new Function() { @Override @@ -202,7 +212,8 @@ public void withCallback() throws InterruptedException { final AtomicInteger callbackCounter = new AtomicInteger(); final AtomicInteger rowCounter = new AtomicInteger(); final CountDownLatch finishedLatch = new CountDownLatch(1); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback( executor, new ReadyCallback() { @@ -236,7 +247,8 @@ public void callbackReceivesError() throws InterruptedException { SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, "invalid query")); final BlockingDeque receivedErr = new LinkedBlockingDeque<>(1); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback( executor, new ReadyCallback() { @@ -271,7 +283,8 @@ public void callbackReceivesErrorHalfwayThrough() throws InterruptedException { when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class)); final AtomicInteger rowCount = new AtomicInteger(); final BlockingDeque receivedErr = new LinkedBlockingDeque<>(1); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback( executor, new ReadyCallback() { @@ -306,7 +319,8 @@ public void pauseResume() throws InterruptedException { final AtomicInteger callbackCounter = new AtomicInteger(); final BlockingDeque queue = new LinkedBlockingDeque<>(1); final AtomicBoolean finished = new AtomicBoolean(false); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback( executor, new ReadyCallback() { @@ -350,7 +364,8 @@ public void cancel() throws InterruptedException { final AtomicInteger callbackCounter = new AtomicInteger(); final BlockingDeque queue = new LinkedBlockingDeque<>(1); final AtomicBoolean finished = new AtomicBoolean(false); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback( executor, new ReadyCallback() { @@ -404,7 +419,8 @@ public void callbackReturnsError() throws InterruptedException { when(delegate.next()).thenReturn(true, true, true, false); when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class)); final AtomicInteger callbackCounter = new AtomicInteger(); - try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate)) { + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { rs.setCallback( executor, new ReadyCallback() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java index 5dbdd1092f..d8267dcb4e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java @@ -35,6 +35,10 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchCreateSessionsRequest; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ExecuteSqlRequest; import io.grpc.Server; import io.grpc.Status; import io.grpc.inprocess.InProcessServerBuilder; @@ -66,8 +70,6 @@ public class AsyncRunnerTest { private Spanner spanner; private Spanner spannerWithEmptySessionPool; - private DatabaseClient client; - private DatabaseClient clientWithEmptySessionPool; @BeforeClass public static void setup() throws Exception { @@ -113,7 +115,6 @@ public void before() { .setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build()) .build() .getService(); - client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); spannerWithEmptySessionPool = spanner .getOptions() @@ -122,9 +123,15 @@ public void before() { SessionPoolOptions.newBuilder().setFailOnSessionLeak().setMinSessions(0).build()) .build() .getService(); - clientWithEmptySessionPool = - spannerWithEmptySessionPool.getDatabaseClient( - DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + } + + private DatabaseClient client() { + return spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + } + + private DatabaseClient clientWithEmptySessionPool() { + return spannerWithEmptySessionPool.getDatabaseClient( + DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); } @After @@ -132,11 +139,12 @@ public void after() { spanner.close(); spannerWithEmptySessionPool.close(); mockSpanner.removeAllExecutionTimes(); + mockSpanner.reset(); } @Test public void asyncRunnerUpdate() throws Exception { - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( new AsyncWork() { @@ -152,12 +160,13 @@ public ApiFuture doWorkAsync(TransactionContext txn) { @Test public void asyncRunnerIsNonBlocking() throws Exception { mockSpanner.freeze(); - AsyncRunner runner = clientWithEmptySessionPool.runAsync(); + AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture res = runner.runAsync( new AsyncWork() { @Override public ApiFuture doWorkAsync(TransactionContext txn) { + txn.executeUpdateAsync(UPDATE_STATEMENT); return ApiFutures.immediateFuture(null); } }, @@ -170,7 +179,7 @@ public ApiFuture doWorkAsync(TransactionContext txn) { @Test public void asyncRunnerInvalidUpdate() throws Exception { - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( new AsyncWork() { @@ -191,13 +200,29 @@ public ApiFuture doWorkAsync(TransactionContext txn) { } } + @Test + public void asyncRunnerFireAndForgetInvalidUpdate() throws Exception { + AsyncRunner runner = client().runAsync(); + ApiFuture res = + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT); + return txn.executeUpdateAsync(UPDATE_STATEMENT); + } + }, + executor); + assertThat(res.get()).isEqualTo(UPDATE_COUNT); + } + @Test public void asyncRunnerUpdateAborted() throws Exception { try { // Temporarily set the result of the update to 2 rows. mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT + 1L)); final AtomicInteger attempt = new AtomicInteger(); - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( new AsyncWork() { @@ -227,7 +252,7 @@ public void asyncRunnerCommitAborted() throws Exception { // Temporarily set the result of the update to 2 rows. mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT + 1L)); final AtomicInteger attempt = new AtomicInteger(); - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( new AsyncWork() { @@ -255,7 +280,7 @@ public ApiFuture doWorkAsync(TransactionContext txn) { @Test public void asyncRunnerUpdateAbortedWithoutGettingResult() throws Exception { final AtomicInteger attempt = new AtomicInteger(); - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture result = runner.runAsync( new AsyncWork() { @@ -277,6 +302,15 @@ public ApiFuture doWorkAsync(TransactionContext txn) { executor); assertThat(result.get()).isNull(); assertThat(attempt.get()).isEqualTo(2); + assertThat(mockSpanner.getRequestTypes()) + .containsExactly( + BatchCreateSessionsRequest.class, + BeginTransactionRequest.class, + ExecuteSqlRequest.class, + CommitRequest.class, + BeginTransactionRequest.class, + ExecuteSqlRequest.class, + CommitRequest.class); } @Test @@ -286,7 +320,7 @@ public void asyncRunnerCommitFails() throws Exception { Status.RESOURCE_EXHAUSTED .withDescription("mutation limit exceeded") .asRuntimeException())); - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( new AsyncWork() { @@ -311,65 +345,76 @@ public ApiFuture doWorkAsync(TransactionContext txn) { } @Test - public void asyncRunnerWaitsUntilAsyncUpdateHasFinished() { - AsyncRunner runner = client.runAsync(); - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.executeUpdateAsync(UPDATE_STATEMENT); - return ApiFutures.immediateFuture(null); - } - }, - executor); + public void asyncRunnerWaitsUntilAsyncUpdateHasFinished() throws Exception { + AsyncRunner runner = clientWithEmptySessionPool().runAsync(); + ApiFuture res = + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + txn.executeUpdateAsync(UPDATE_STATEMENT); + return ApiFutures.immediateFuture(null); + } + }, + executor); + res.get(); + assertThat(mockSpanner.getRequestTypes()) + .containsExactly( + BatchCreateSessionsRequest.class, + BeginTransactionRequest.class, + ExecuteSqlRequest.class, + CommitRequest.class); } + @Test public void closeTransactionBeforeEndOfAsyncQuery() throws Exception { final BlockingQueue results = new SynchronousQueue<>(); final SettableApiFuture finished = SettableApiFuture.create(); - DatabaseClientImpl clientImpl = (DatabaseClientImpl) client; + DatabaseClientImpl clientImpl = (DatabaseClientImpl) client(); // There should currently not be any sessions checked out of the pool. assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(0); - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = clientImpl.runAsync(); final CountDownLatch dataReceived = new CountDownLatch(1); - ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - try (AsyncResultSet rs = - txn.readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES, Options.bufferRows(1))) { - rs.setCallback( - Executors.newSingleThreadExecutor(), - new ReadyCallback() { - @Override - public CallbackResponse cursorReady(AsyncResultSet resultSet) { - try { - while (true) { - switch (resultSet.tryNext()) { - case DONE: - finished.set(true); - return CallbackResponse.DONE; - case NOT_READY: - return CallbackResponse.CONTINUE; - case OK: - dataReceived.countDown(); - results.put(resultSet.getString(0)); + ApiFuture res = + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + try (AsyncResultSet rs = + txn.readAsync( + READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES, Options.bufferRows(1))) { + rs.setCallback( + Executors.newSingleThreadExecutor(), + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + finished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + dataReceived.countDown(); + results.put(resultSet.getString(0)); + } + } + } catch (Throwable t) { + finished.setException(t); + dataReceived.countDown(); + return CallbackResponse.DONE; } } - } catch (Throwable t) { - finished.setException(t); - dataReceived.countDown(); - return CallbackResponse.DONE; - } - } - }); - } - return ApiFutures.immediateFuture(null); - } - }, - executor); + }); + } + return ApiFutures.immediateFuture(null); + } + }, + executor); // Wait until at least one row has been fetched. At that moment there should be one session // checked out. dataReceived.await(); @@ -388,7 +433,7 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) { @Test public void asyncRunnerReadRow() throws Exception { - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture val = runner.runAsync( new AsyncWork() { @@ -411,7 +456,7 @@ public String apply(Struct input) { @Test public void asyncRunnerRead() throws Exception { - AsyncRunner runner = client.runAsync(); + AsyncRunner runner = client().runAsync(); ApiFuture> val = runner.runAsync( new AsyncWork>() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 8e0375bd27..7cb7567321 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1634,6 +1634,24 @@ public List getRequests() { return new ArrayList<>(this.requests); } + public Iterable> getRequestTypes() { + List> res = new LinkedList<>(); + for (AbstractMessage m : this.requests) { + res.add(m.getClass()); + } + return res; + } + + public int countRequestsOfType(Class type) { + int c = 0; + for (AbstractMessage m : this.requests) { + if (m.getClass().equals(type)) { + c++; + } + } + return c; + } + @Override public void addResponse(AbstractMessage response) { throw new UnsupportedOperationException(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java index 0caab4f574..ff211c4e83 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java @@ -302,29 +302,27 @@ public Void run(TransactionContext transaction) throws Exception { @Test public void transactionRunnerWithError() { - for (int i = 0; i < 1000; i++) { - TransactionRunner runner = client.readWriteTransaction(); - try { - runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - return null; - } - }); - fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - - Map spans = failOnOverkillTraceComponent.getSpans(); - assertThat(spans.size()).isEqualTo(5); - assertThat(spans).containsEntry("CloudSpanner.ReadWriteTransaction", true); - assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessions", true); - assertThat(spans).containsEntry("SessionPool.WaitForSession", true); - assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessionsRequest", true); - assertThat(spans).containsEntry("CloudSpannerOperation.BeginTransaction", true); + TransactionRunner runner = client.readWriteTransaction(); + try { + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.executeUpdate(INVALID_UPDATE_STATEMENT); + return null; + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertThat(spans.size()).isEqualTo(5); + assertThat(spans).containsEntry("CloudSpanner.ReadWriteTransaction", true); + assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessions", true); + assertThat(spans).containsEntry("SessionPool.WaitForSession", true); + assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessionsRequest", true); + assertThat(spans).containsEntry("CloudSpannerOperation.BeginTransaction", true); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java index bc46ff8b0f..721536cb6b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java @@ -23,6 +23,8 @@ import com.google.cloud.spanner.AsyncResultSet; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; +import com.google.cloud.spanner.AsyncRunner; +import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; @@ -34,8 +36,10 @@ import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Type.StructField; import com.google.cloud.spanner.testing.RemoteSpannerHelper; @@ -271,4 +275,31 @@ public void columnNotFound() throws Exception { assertThat(se.getMessage()).contains("BadColumnName"); } } + + @Test + public void asyncRunnerFireAndForgetInvalidUpdate() throws Exception { + try { + assertThat(client.singleUse().readRow("TestTable", Key.of("k999"), ALL_COLUMNS)).isNull(); + AsyncRunner runner = client.runAsync(); + ApiFuture res = + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + // The error returned by this update statement will not bubble up and fail the + // transaction. + txn.executeUpdateAsync(Statement.of("UPDATE BadTableName SET FOO=1 WHERE ID=2")); + return txn.executeUpdateAsync( + Statement.of( + "INSERT INTO TestTable (Key, StringValue) VALUES ('k999', 'v999')")); + } + }, + executor); + assertThat(res.get()).isEqualTo(1L); + assertThat(client.singleUse().readRow("TestTable", Key.of("k999"), ALL_COLUMNS)).isNotNull(); + } finally { + client.writeAtLeastOnce(Arrays.asList(Mutation.delete("TestTable", Key.of("k999")))); + assertThat(client.singleUse().readRow("TestTable", Key.of("k999"), ALL_COLUMNS)).isNull(); + } + } }