diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 87891f33010..21f9094b24d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -154,6 +154,13 @@ public void close() { } } + /** + * Called when no more rows will be read from the underlying {@link ResultSet}, either because all + * rows have been read, or because {@link ReadyCallback#cursorReady(AsyncResultSet)} returned + * {@link CallbackResponse#DONE}. + */ + void onFinished() {} + /** * Tries to advance this {@link AsyncResultSet} to the next row. This method may only be called * from within a {@link ReadyCallback}. @@ -331,6 +338,8 @@ public Void call() throws Exception { try { delegateResultSet.close(); } catch (Throwable t) { + } finally { + onFinished(); } // Ensure that the callback has been called at least once, even if the result set was diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index e5d1729feff..138b5ace9f2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -59,6 +59,11 @@ public static ReadAndQueryOption prefetchChunks(int prefetchChunks) { return new FlowControlOption(prefetchChunks); } + public static ReadAndQueryOption bufferRows(int bufferRows) { + Preconditions.checkArgument(bufferRows > 0, "bufferRows should be greater than 0"); + return new BufferRowsOption(bufferRows); + } + /** * Specifying this will cause the list operations to fetch at most this many records in a page. */ @@ -115,8 +120,22 @@ void appendToOptions(Options options) { } } + static final class BufferRowsOption extends InternalOption implements ReadAndQueryOption { + final int bufferRows; + + BufferRowsOption(int bufferRows) { + this.bufferRows = bufferRows; + } + + @Override + void appendToOptions(Options options) { + options.bufferRows = bufferRows; + } + } + private Long limit; private Integer prefetchChunks; + private Integer bufferRows; private Integer pageSize; private String pageToken; private String filter; @@ -140,6 +159,14 @@ int prefetchChunks() { return prefetchChunks; } + boolean hasBufferRows() { + return bufferRows != null; + } + + int bufferRows() { + return bufferRows; + } + boolean hasPageSize() { return pageSize != null; } @@ -203,6 +230,10 @@ public boolean equals(Object o) { || hasPrefetchChunks() && that.hasPrefetchChunks() && Objects.equals(prefetchChunks(), that.prefetchChunks())) + && (!hasBufferRows() && !that.hasBufferRows() + || hasBufferRows() + && that.hasBufferRows() + && Objects.equals(bufferRows(), that.bufferRows())) && (!hasPageSize() && !that.hasPageSize() || hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize())) && Objects.equals(pageToken(), that.pageToken()) @@ -218,6 +249,9 @@ public int hashCode() { if (prefetchChunks != null) { result = 31 * result + prefetchChunks.hashCode(); } + if (bufferRows != null) { + result = 31 * result + bufferRows.hashCode(); + } if (pageSize != null) { result = 31 * result + pageSize.hashCode(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 1d7ed980d2a..da1c48f66b8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -32,6 +32,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.ExecutorProvider; import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; @@ -88,6 +89,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -141,14 +143,51 @@ public ResultSet get() { * finished, if it is a single use context. */ private static class AutoClosingReadContext implements ReadContext { + private class AutoClosingReadContextAsyncResultSetImpl extends AsyncResultSetImpl { + private AutoClosingReadContextAsyncResultSetImpl( + ExecutorProvider executorProvider, ResultSet delegate, int bufferRows) { + super(executorProvider, delegate, bufferRows); + } + + @Override + public void setCallback(Executor exec, ReadyCallback cb) { + asyncOperationsCount.incrementAndGet(); + super.setCallback(exec, cb); + } + + @Override + void onFinished() { + synchronized (lock) { + if (asyncOperationsCount.decrementAndGet() == 0) { + if (closed) { + // All async operations for this read context have finished. + AutoClosingReadContext.this.close(); + } + } + } + } + } + private final Function readContextDelegateSupplier; private T readContextDelegate; private final SessionPool sessionPool; - private PooledSessionFuture session; private final boolean isSingleUse; - private boolean closed; + private final AtomicInteger asyncOperationsCount = new AtomicInteger(); + + private Object lock = new Object(); + + @GuardedBy("lock") private boolean sessionUsedForQuery = false; + @GuardedBy("lock") + private PooledSessionFuture session; + + @GuardedBy("lock") + private boolean closed; + + @GuardedBy("lock") + private boolean delegateClosed; + private AutoClosingReadContext( Function delegateSupplier, SessionPool sessionPool, @@ -162,12 +201,14 @@ private AutoClosingReadContext( T getReadContextDelegate() { if (readContextDelegate == null) { - while (true) { - try { - this.readContextDelegate = readContextDelegateSupplier.apply(this.session); - break; - } catch (SessionNotFoundException e) { - replaceSessionIfPossible(e); + synchronized (lock) { + while (true) { + try { + this.readContextDelegate = readContextDelegateSupplier.apply(this.session); + break; + } catch (SessionNotFoundException e) { + replaceSessionIfPossible(e); + } } } } @@ -204,9 +245,11 @@ private boolean internalNext() { try { boolean ret = super.next(); if (beforeFirst) { - session.get().markUsed(); - beforeFirst = false; - sessionUsedForQuery = true; + synchronized (lock) { + session.get().markUsed(); + beforeFirst = false; + sessionUsedForQuery = true; + } } if (!ret && isSingleUse) { close(); @@ -215,9 +258,11 @@ private boolean internalNext() { } catch (SessionNotFoundException e) { throw e; } catch (SpannerException e) { - if (!closed && isSingleUse) { - session.get().lastException = e; - AutoClosingReadContext.this.close(); + synchronized (lock) { + if (!closed && isSingleUse) { + session.get().lastException = e; + AutoClosingReadContext.this.close(); + } } throw e; } @@ -234,13 +279,15 @@ public void close() { } private void replaceSessionIfPossible(SessionNotFoundException notFound) { - if (isSingleUse || !sessionUsedForQuery) { - // This class is only used by read-only transactions, so we know that we only need a - // read-only session. - session = sessionPool.replaceReadSession(notFound, session); - readContextDelegate = readContextDelegateSupplier.apply(session); - } else { - throw notFound; + synchronized (lock) { + if (isSingleUse || !sessionUsedForQuery) { + // This class is only used by read-only transactions, so we know that we only need a + // read-only session. + session = sessionPool.replaceReadSession(notFound, session); + readContextDelegate = readContextDelegateSupplier.apply(session); + } else { + throw notFound; + } } } @@ -265,7 +312,9 @@ public AsyncResultSet readAsync( final KeySet keys, final Iterable columns, final ReadOption... options) { - return new AsyncResultSetImpl( + Options readOptions = Options.fromReadOptions(options); + final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10; + return new AutoClosingReadContextAsyncResultSetImpl( sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(), wrap( new CachedResultSetSupplier() { @@ -273,7 +322,8 @@ public AsyncResultSet readAsync( ResultSet load() { return getReadContextDelegate().read(table, keys, columns, options); } - })); + }), + bufferRows); } @Override @@ -299,7 +349,9 @@ public AsyncResultSet readUsingIndexAsync( final KeySet keys, final Iterable columns, final ReadOption... options) { - return new AsyncResultSetImpl( + Options readOptions = Options.fromReadOptions(options); + final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10; + return new AutoClosingReadContextAsyncResultSetImpl( sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(), wrap( new CachedResultSetSupplier() { @@ -308,7 +360,8 @@ ResultSet load() { return getReadContextDelegate() .readUsingIndex(table, index, keys, columns, options); } - })); + }), + bufferRows); } @Override @@ -317,14 +370,18 @@ public Struct readRow(String table, Key key, Iterable columns) { try { while (true) { try { - session.get().markUsed(); + synchronized (lock) { + session.get().markUsed(); + } return getReadContextDelegate().readRow(table, key, columns); } catch (SessionNotFoundException e) { replaceSessionIfPossible(e); } } } finally { - sessionUsedForQuery = true; + synchronized (lock) { + sessionUsedForQuery = true; + } if (isSingleUse) { close(); } @@ -346,14 +403,18 @@ public Struct readRowUsingIndex(String table, String index, Key key, Iterable results = new SynchronousQueue<>(); + final SettableApiFuture finished = SettableApiFuture.create(); + DatabaseClientImpl clientImpl = (DatabaseClientImpl) client; + + // There should currently not be any sessions checked out of the pool. + assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(0); + + final CountDownLatch dataReceived = new CountDownLatch(1); + try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { + try (AsyncResultSet rs = + tx.readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES, Options.bufferRows(1))) { + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + finished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + dataReceived.countDown(); + results.put(resultSet.getString(0)); + } + } + } catch (Throwable t) { + finished.setException(t); + return CallbackResponse.DONE; + } + } + }); + } + // Wait until at least one row has been fetched. At that moment there should be one session + // checked out. + dataReceived.await(); + assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(1); + } + // The read-only transaction is now closed, but the ready callback will continue to receive + // data. As it tries to put the data into a synchronous queue and the underlying buffer can also + // only hold 1 row, the async result set has not yet finished. The read-only transaction will + // release the session back into the pool when all async statements have finished. The number of + // sessions in use is therefore still 1. + assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(1); + List resultList = new ArrayList<>(); + do { + results.drainTo(resultList); + } while (!finished.isDone() || results.size() > 0); + assertThat(finished.get()).isTrue(); + assertThat(resultList).containsExactly("k1", "k2", "k3"); + // The session will be released back into the pool by the asynchronous result set when it has + // returned all rows. As this is done in the background, it could take a couple of milliseconds. + Thread.sleep(10L); + assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(0); + } }