Skip to content

Commit

Permalink
feat: keep session checked out until async finishes
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 26, 2020
1 parent 973abe6 commit 768b4ae
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 42 deletions.
Expand Up @@ -154,6 +154,13 @@ public void close() {
}
}

/**
* Called when no more rows will be read from the underlying {@link ResultSet}, either because all
* rows have been read, or because {@link ReadyCallback#cursorReady(AsyncResultSet)} returned
* {@link CallbackResponse#DONE}.
*/
void onFinished() {}

/**
* Tries to advance this {@link AsyncResultSet} to the next row. This method may only be called
* from within a {@link ReadyCallback}.
Expand Down Expand Up @@ -331,6 +338,8 @@ public Void call() throws Exception {
try {
delegateResultSet.close();
} catch (Throwable t) {
} finally {
onFinished();
}

// Ensure that the callback has been called at least once, even if the result set was
Expand Down
Expand Up @@ -59,6 +59,11 @@ public static ReadAndQueryOption prefetchChunks(int prefetchChunks) {
return new FlowControlOption(prefetchChunks);
}

public static ReadAndQueryOption bufferRows(int bufferRows) {
Preconditions.checkArgument(bufferRows > 0, "bufferRows should be greater than 0");
return new BufferRowsOption(bufferRows);
}

/**
* Specifying this will cause the list operations to fetch at most this many records in a page.
*/
Expand Down Expand Up @@ -115,8 +120,22 @@ void appendToOptions(Options options) {
}
}

static final class BufferRowsOption extends InternalOption implements ReadAndQueryOption {
final int bufferRows;

BufferRowsOption(int bufferRows) {
this.bufferRows = bufferRows;
}

@Override
void appendToOptions(Options options) {
options.bufferRows = bufferRows;
}
}

private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
private Integer pageSize;
private String pageToken;
private String filter;
Expand All @@ -140,6 +159,14 @@ int prefetchChunks() {
return prefetchChunks;
}

boolean hasBufferRows() {
return bufferRows != null;
}

int bufferRows() {
return bufferRows;
}

boolean hasPageSize() {
return pageSize != null;
}
Expand Down Expand Up @@ -203,6 +230,10 @@ public boolean equals(Object o) {
|| hasPrefetchChunks()
&& that.hasPrefetchChunks()
&& Objects.equals(prefetchChunks(), that.prefetchChunks()))
&& (!hasBufferRows() && !that.hasBufferRows()
|| hasBufferRows()
&& that.hasBufferRows()
&& Objects.equals(bufferRows(), that.bufferRows()))
&& (!hasPageSize() && !that.hasPageSize()
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
&& Objects.equals(pageToken(), that.pageToken())
Expand All @@ -218,6 +249,9 @@ public int hashCode() {
if (prefetchChunks != null) {
result = 31 * result + prefetchChunks.hashCode();
}
if (bufferRows != null) {
result = 31 * result + bufferRows.hashCode();
}
if (pageSize != null) {
result = 31 * result + pageSize.hashCode();
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand Down Expand Up @@ -88,6 +89,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -141,14 +143,51 @@ public ResultSet get() {
* finished, if it is a single use context.
*/
private static class AutoClosingReadContext<T extends ReadContext> implements ReadContext {
private class AutoClosingReadContextAsyncResultSetImpl extends AsyncResultSetImpl {
private AutoClosingReadContextAsyncResultSetImpl(
ExecutorProvider executorProvider, ResultSet delegate, int bufferRows) {
super(executorProvider, delegate, bufferRows);
}

@Override
public void setCallback(Executor exec, ReadyCallback cb) {
asyncOperationsCount.incrementAndGet();
super.setCallback(exec, cb);
}

@Override
void onFinished() {
synchronized (lock) {
if (asyncOperationsCount.decrementAndGet() == 0) {
if (closed) {
// All async operations for this read context have finished.
AutoClosingReadContext.this.close();
}
}
}
}
}

private final Function<PooledSessionFuture, T> readContextDelegateSupplier;
private T readContextDelegate;
private final SessionPool sessionPool;
private PooledSessionFuture session;
private final boolean isSingleUse;
private boolean closed;
private final AtomicInteger asyncOperationsCount = new AtomicInteger();

private Object lock = new Object();

@GuardedBy("lock")
private boolean sessionUsedForQuery = false;

@GuardedBy("lock")
private PooledSessionFuture session;

@GuardedBy("lock")
private boolean closed;

@GuardedBy("lock")
private boolean delegateClosed;

private AutoClosingReadContext(
Function<PooledSessionFuture, T> delegateSupplier,
SessionPool sessionPool,
Expand All @@ -162,12 +201,14 @@ private AutoClosingReadContext(

T getReadContextDelegate() {
if (readContextDelegate == null) {
while (true) {
try {
this.readContextDelegate = readContextDelegateSupplier.apply(this.session);
break;
} catch (SessionNotFoundException e) {
replaceSessionIfPossible(e);
synchronized (lock) {
while (true) {
try {
this.readContextDelegate = readContextDelegateSupplier.apply(this.session);
break;
} catch (SessionNotFoundException e) {
replaceSessionIfPossible(e);
}
}
}
}
Expand Down Expand Up @@ -204,9 +245,11 @@ private boolean internalNext() {
try {
boolean ret = super.next();
if (beforeFirst) {
session.get().markUsed();
beforeFirst = false;
sessionUsedForQuery = true;
synchronized (lock) {
session.get().markUsed();
beforeFirst = false;
sessionUsedForQuery = true;
}
}
if (!ret && isSingleUse) {
close();
Expand All @@ -215,9 +258,11 @@ private boolean internalNext() {
} catch (SessionNotFoundException e) {
throw e;
} catch (SpannerException e) {
if (!closed && isSingleUse) {
session.get().lastException = e;
AutoClosingReadContext.this.close();
synchronized (lock) {
if (!closed && isSingleUse) {
session.get().lastException = e;
AutoClosingReadContext.this.close();
}
}
throw e;
}
Expand All @@ -234,13 +279,15 @@ public void close() {
}

private void replaceSessionIfPossible(SessionNotFoundException notFound) {
if (isSingleUse || !sessionUsedForQuery) {
// This class is only used by read-only transactions, so we know that we only need a
// read-only session.
session = sessionPool.replaceReadSession(notFound, session);
readContextDelegate = readContextDelegateSupplier.apply(session);
} else {
throw notFound;
synchronized (lock) {
if (isSingleUse || !sessionUsedForQuery) {
// This class is only used by read-only transactions, so we know that we only need a
// read-only session.
session = sessionPool.replaceReadSession(notFound, session);
readContextDelegate = readContextDelegateSupplier.apply(session);
} else {
throw notFound;
}
}
}

Expand All @@ -265,15 +312,18 @@ public AsyncResultSet readAsync(
final KeySet keys,
final Iterable<String> columns,
final ReadOption... options) {
return new AsyncResultSetImpl(
Options readOptions = Options.fromReadOptions(options);
final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10;
return new AutoClosingReadContextAsyncResultSetImpl(
sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
wrap(
new CachedResultSetSupplier() {
@Override
ResultSet load() {
return getReadContextDelegate().read(table, keys, columns, options);
}
}));
}),
bufferRows);
}

@Override
Expand All @@ -299,7 +349,9 @@ public AsyncResultSet readUsingIndexAsync(
final KeySet keys,
final Iterable<String> columns,
final ReadOption... options) {
return new AsyncResultSetImpl(
Options readOptions = Options.fromReadOptions(options);
final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10;
return new AutoClosingReadContextAsyncResultSetImpl(
sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
wrap(
new CachedResultSetSupplier() {
Expand All @@ -308,7 +360,8 @@ ResultSet load() {
return getReadContextDelegate()
.readUsingIndex(table, index, keys, columns, options);
}
}));
}),
bufferRows);
}

@Override
Expand All @@ -317,14 +370,18 @@ public Struct readRow(String table, Key key, Iterable<String> columns) {
try {
while (true) {
try {
session.get().markUsed();
synchronized (lock) {
session.get().markUsed();
}
return getReadContextDelegate().readRow(table, key, columns);
} catch (SessionNotFoundException e) {
replaceSessionIfPossible(e);
}
}
} finally {
sessionUsedForQuery = true;
synchronized (lock) {
sessionUsedForQuery = true;
}
if (isSingleUse) {
close();
}
Expand All @@ -346,14 +403,18 @@ public Struct readRowUsingIndex(String table, String index, Key key, Iterable<St
try {
while (true) {
try {
session.get().markUsed();
synchronized (lock) {
session.get().markUsed();
}
return getReadContextDelegate().readRowUsingIndex(table, index, key, columns);
} catch (SessionNotFoundException e) {
replaceSessionIfPossible(e);
}
}
} finally {
sessionUsedForQuery = true;
synchronized (lock) {
sessionUsedForQuery = true;
}
if (isSingleUse) {
close();
}
Expand Down Expand Up @@ -384,15 +445,18 @@ ResultSet load() {
@Override
public AsyncResultSet executeQueryAsync(
final Statement statement, final QueryOption... options) {
return new AsyncResultSetImpl(
Options queryOptions = Options.fromQueryOptions(options);
final int bufferRows = queryOptions.hasBufferRows() ? queryOptions.bufferRows() : 10;
return new AutoClosingReadContextAsyncResultSetImpl(
sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
wrap(
new CachedResultSetSupplier() {
@Override
ResultSet load() {
return getReadContextDelegate().executeQuery(statement, options);
}
}));
}),
bufferRows);
}

@Override
Expand All @@ -408,14 +472,19 @@ ResultSet load() {

@Override
public void close() {
if (closed) {
return;
}
closed = true;
if (readContextDelegate != null) {
readContextDelegate.close();
synchronized (lock) {
if (closed && delegateClosed) {
return;
}
closed = true;
if (asyncOperationsCount.get() == 0) {
if (readContextDelegate != null) {
readContextDelegate.close();
}
session.close();
delegateClosed = true;
}
}
session.close();
}
}

Expand Down Expand Up @@ -1601,6 +1670,13 @@ private SessionPool(
this.initMetricsCollection(metricRegistry, labelValues);
}

@VisibleForTesting
int getNumberOfSessionsInUse() {
synchronized (lock) {
return numSessionsInUse;
}
}

@VisibleForTesting
int getNumberOfAvailableWritePreparedSessions() {
synchronized (lock) {
Expand Down
Expand Up @@ -92,8 +92,7 @@ public class MockSpannerTestUtil {
.build())
.build();
static final Type READ_TABLE_TYPE =
Type.struct(
StructField.of("Key", Type.string()), StructField.of("Value", Type.string()));
Type.struct(StructField.of("Key", Type.string()), StructField.of("Value", Type.string()));
static final com.google.spanner.v1.ResultSet EMPTY_KEY_VALUE_RESULTSET =
com.google.spanner.v1.ResultSet.newBuilder()
.addRows(ListValue.newBuilder().build())
Expand Down

0 comments on commit 768b4ae

Please sign in to comment.