Skip to content

Commit

Permalink
fix: make async runner wait for async operations
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 28, 2020
1 parent a6f65e8 commit 80d023a
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 139 deletions.
Expand Up @@ -58,6 +58,18 @@
*/
abstract class AbstractReadContext
implements ReadContext, AbstractResultSet.Listener, SessionTransaction {

/**
* {@link AsyncResultSet} that supports adding listeners that are called when all rows from the
* underlying result stream have been fetched.
*/
interface ListenableAsyncResultSet extends AsyncResultSet {
/** Adds a listener to this {@link AsyncResultSet}. */
void addListener(Runnable listener);

void removeListener(Runnable listener);
}

/**
* A {@code ReadContext} for standalone reads. This can only be used for a single operation, since
* each standalone read may see a different timestamp of Cloud Spanner data.
Expand Down Expand Up @@ -317,10 +329,15 @@ public final ResultSet read(
}

@Override
public final AsyncResultSet readAsync(
public ListenableAsyncResultSet readAsync(
String table, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, readInternal(table, null, keys, columns, options));
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
}

@Override
Expand All @@ -330,10 +347,17 @@ public final ResultSet readUsingIndex(
}

@Override
public final AsyncResultSet readUsingIndexAsync(
public ListenableAsyncResultSet readUsingIndexAsync(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, readInternal(table, checkNotNull(index), keys, columns, options));
executorProvider,
readInternal(table, checkNotNull(index), keys, columns, options),
bufferRows);
}

@Nullable
Expand Down Expand Up @@ -376,11 +400,17 @@ public final ResultSet executeQuery(Statement statement, QueryOption... options)
}

@Override
public final AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options));
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
bufferRows);
}

@Override
Expand Down
Expand Up @@ -19,11 +19,14 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ResultSetStats;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand All @@ -34,7 +37,8 @@
import java.util.concurrent.ScheduledExecutorService;

/** Default implementation for {@link AsyncResultSet}. */
class AsyncResultSetImpl extends ForwardingStructReader implements AsyncResultSet {
class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {

/** State of an {@link AsyncResultSetImpl}. */
private enum State {
INITIALIZED,
Expand All @@ -58,7 +62,7 @@ private State(boolean shouldStop) {
}
}

private static final int DEFAULT_BUFFER_SIZE = 10;
static final int DEFAULT_BUFFER_SIZE = 10;
private static final int MAX_WAIT_FOR_BUFFER_CONSUMPTION = 10;

private final Object monitor = new Object();
Expand Down Expand Up @@ -90,6 +94,12 @@ private State(boolean shouldStop) {

private ReadyCallback callback;

/**
* Listeners that will be called when the {@link AsyncResultSetImpl} has finished fetching all
* rows and any underlying transaction or session can be closed.
*/
private Collection<Runnable> listeners = new LinkedList<>();

private State state = State.INITIALIZED;

/**
Expand Down Expand Up @@ -122,10 +132,6 @@ private State(boolean shouldStop) {
*/
private volatile CountDownLatch consumingLatch = new CountDownLatch(0);

AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate) {
this(executorProvider, delegate, DEFAULT_BUFFER_SIZE);
}

AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) {
super(delegate);
this.buffer = new LinkedBlockingDeque<>(bufferSize);
Expand Down Expand Up @@ -155,11 +161,21 @@ public void close() {
}

/**
* Called when no more rows will be read from the underlying {@link ResultSet}, either because all
* rows have been read, or because {@link ReadyCallback#cursorReady(AsyncResultSet)} returned
* {@link CallbackResponse#DONE}.
* Adds a listener that will be called when no more rows will be read from the underlying {@link
* ResultSet}, either because all rows have been read, or because {@link
* ReadyCallback#cursorReady(AsyncResultSet)} returned {@link CallbackResponse#DONE}.
*/
void onFinished() {}
@Override
public void addListener(Runnable listener) {
Preconditions.checkState(state == State.INITIALIZED);
listeners.add(listener);
}

@Override
public void removeListener(Runnable listener) {
Preconditions.checkState(state == State.INITIALIZED);
listeners.remove(listener);
}

/**
* Tries to advance this {@link AsyncResultSet} to the next row. This method may only be called
Expand Down Expand Up @@ -339,7 +355,9 @@ public Void call() throws Exception {
delegateResultSet.close();
} catch (Throwable t) {
} finally {
onFinished();
for (Runnable listener : listeners) {
listener.run();
}
}

// Ensure that the callback has been called at least once, even if the result set was
Expand Down
@@ -0,0 +1,66 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.concurrent.Executor;

/** Forwarding implementation of {@link AsyncResultSet} that forwards all calls to a delegate. */
public class ForwardingAsyncResultSet extends ForwardingResultSet implements AsyncResultSet {
final AsyncResultSet delegate;

public ForwardingAsyncResultSet(AsyncResultSet delegate) {
super(Preconditions.checkNotNull(delegate));
this.delegate = delegate;
}

@Override
public CursorState tryNext() throws SpannerException {
return delegate.tryNext();
}

@Override
public void setCallback(Executor exec, ReadyCallback cb) {
delegate.setCallback(exec, cb);
;
}

@Override
public void cancel() {
delegate.cancel();
}

@Override
public void resume() {
delegate.resume();
}

@Override
public <T> ApiFuture<ImmutableList<T>> toListAsync(
Function<StructReader, T> transformer, Executor executor) {
return delegate.toListAsync(transformer, executor);
}

@Override
public <T> ImmutableList<T> toList(Function<StructReader, T> transformer)
throws SpannerException {
return delegate.toList(transformer);
}
}
Expand Up @@ -81,20 +81,17 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -151,6 +148,11 @@ public ResultSet get() {
* finished, if it is a single use context.
*/
private static class AutoClosingReadContext<T extends ReadContext> implements ReadContext {
/**
* {@link AsyncResultSet} implementation that keeps track of the async operations that are still
* running for this {@link ReadContext} and that should finish before the {@link ReadContext}
* releases its session back into the pool.
*/
private class AutoClosingReadContextAsyncResultSetImpl extends AsyncResultSetImpl {
private AutoClosingReadContextAsyncResultSetImpl(
ExecutorProvider executorProvider, ResultSet delegate, int bufferRows) {
Expand All @@ -159,19 +161,28 @@ private AutoClosingReadContextAsyncResultSetImpl(

@Override
public void setCallback(Executor exec, ReadyCallback cb) {
asyncOperationsCount.incrementAndGet();
super.setCallback(exec, cb);
}

@Override
void onFinished() {
synchronized (lock) {
if (asyncOperationsCount.decrementAndGet() == 0) {
if (closed) {
// All async operations for this read context have finished.
AutoClosingReadContext.this.close();
}
}
Runnable listener =
new Runnable() {
@Override
public void run() {
synchronized (lock) {
if (asyncOperationsCount.decrementAndGet() == 0) {
if (closed) {
// All async operations for this read context have finished.
AutoClosingReadContext.this.close();
}
}
}
}
};
try {
asyncOperationsCount.incrementAndGet();
addListener(listener);
super.setCallback(exec, cb);
} catch (Throwable t) {
removeListener(listener);
asyncOperationsCount.decrementAndGet();
throw t;
}
}
}
Expand Down Expand Up @@ -321,7 +332,10 @@ public AsyncResultSet readAsync(
final Iterable<String> columns,
final ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10;
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AutoClosingReadContextAsyncResultSetImpl(
sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
wrap(
Expand Down Expand Up @@ -358,7 +372,10 @@ public AsyncResultSet readUsingIndexAsync(
final Iterable<String> columns,
final ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows = readOptions.hasBufferRows() ? readOptions.bufferRows() : 10;
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AutoClosingReadContextAsyncResultSetImpl(
sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
wrap(
Expand Down Expand Up @@ -454,7 +471,10 @@ ResultSet load() {
public AsyncResultSet executeQueryAsync(
final Statement statement, final QueryOption... options) {
Options queryOptions = Options.fromQueryOptions(options);
final int bufferRows = queryOptions.hasBufferRows() ? queryOptions.bufferRows() : 10;
final int bufferRows =
queryOptions.hasBufferRows()
? queryOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AutoClosingReadContextAsyncResultSetImpl(
sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
wrap(
Expand Down Expand Up @@ -1546,6 +1566,9 @@ private static enum Position {
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService prepareExecutor;

// TODO(loite): Refactor Waiter to use a SettableFuture that can be set when a session is released
// into the pool, instead of using a thread waiting on a synchronous queue.
private final ScheduledExecutorService readWaiterExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
Expand All @@ -1558,6 +1581,7 @@ private static enum Position {
.setDaemon(true)
.setNameFormat("session-pool-write-waiter-%d")
.build());

final PoolMaintainer poolMaintainer;
private final Clock clock;
private final Object lock = new Object();
Expand Down
Expand Up @@ -42,7 +42,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down

0 comments on commit 80d023a

Please sign in to comment.