Skip to content

Commit

Permalink
feat: add inline begin for async runner
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Jul 17, 2020
1 parent ec5b49e commit 436833b
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 81 deletions.
Expand Up @@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
}

/**
Expand Down Expand Up @@ -685,7 +686,7 @@ public void close() {
public void onTransactionMetadata(Transaction transaction) {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
Expand Down Expand Up @@ -746,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
GrpcResultSet resultSet =
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
return resultSet;
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ interface Listener {
void onTransactionMetadata(Transaction transaction) throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e);
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
Expand All @@ -91,14 +91,17 @@ interface Listener {
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
private final GrpcValueIterator iterator;
private final Listener listener;
private final boolean beginTransaction;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.beginTransaction = beginTransaction;
}

@Override
Expand Down Expand Up @@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e);
throw yieldError(e, beginTransaction && currRow == null);
}
}

Expand All @@ -149,9 +152,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e) {
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e);
listener.onError(e, beginTransaction);
throw e;
}
}
Expand Down
Expand Up @@ -231,6 +231,10 @@ private TransactionManager inlinedTransactionManager() {

@Override
public AsyncRunner runAsync() {
return inlineBeginReadWriteTransactions ? inlinedRunAsync() : preparedRunAsync();
}

private AsyncRunner preparedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
Expand All @@ -240,6 +244,16 @@ public AsyncRunner runAsync() {
}
}

private AsyncRunner inlinedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncTransactionManager transactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
Expand Down
Expand Up @@ -238,7 +238,10 @@ public AsyncRunner runAsync() {
return new AsyncRunnerImpl(
setActive(
new TransactionRunnerImpl(
this, spanner.getRpc(), spanner.getDefaultPrefetchChunks(), false)));
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction())));
}

@Override
Expand Down
Expand Up @@ -55,10 +55,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -158,7 +158,8 @@ public void removeListener(Runnable listener) {
* been created, the lock is released and concurrent requests can be executed on the
* transaction.
*/
private final ReentrantLock transactionLock = new ReentrantLock();
// private final ReentrantLock transactionLock = new ReentrantLock();
private volatile CountDownLatch transactionLatch = new CountDownLatch(0);

private volatile ByteString transactionId;
private Timestamp commitTimestamp;
Expand Down Expand Up @@ -333,7 +334,7 @@ public void run() {
}
span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e));
TraceUtil.endSpanWithFailure(opSpan, e);
onError((SpannerException) e);
onError((SpannerException) e, false);
res.setException(e);
}
}
Expand Down Expand Up @@ -401,20 +402,38 @@ TransactionSelector getTransactionSelector() {
try {
// Wait if another request is already beginning, committing or rolling back the
// transaction.
transactionLock.lockInterruptibly();
// Check again if a transactionId is now available. It could be that the thread that was
// holding the lock and that had sent a statement with a BeginTransaction request caused
// an error and did not return a transaction.
if (transactionId == null) {
// Return a TransactionSelector that will start a new transaction as part of the
// statement that is being executed.
return TransactionSelector.newBuilder()
.setBegin(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
} else {
transactionLock.unlock();

// transactionLock.lockInterruptibly();
while (true) {
CountDownLatch latch;
synchronized (lock) {
latch = transactionLatch;
}
latch.await();

synchronized (lock) {
if (transactionLatch.getCount() > 0L) {
continue;
}
// Check again if a transactionId is now available. It could be that the thread that
// was
// holding the lock and that had sent a statement with a BeginTransaction request
// caused
// an error and did not return a transaction.
if (transactionId == null) {
transactionLatch = new CountDownLatch(1);
// Return a TransactionSelector that will start a new transaction as part of the
// statement that is being executed.
return TransactionSelector.newBuilder()
.setBegin(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
} else {
// transactionLock.unlock();
break;
}
}
}
} catch (InterruptedException e) {
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
Expand All @@ -430,18 +449,24 @@ public void onTransactionMetadata(Transaction transaction) {
// transaction on this instance and release the lock to allow other statements to proceed.
if (this.transactionId == null && transaction != null && transaction.getId() != null) {
this.transactionId = transaction.getId();
transactionLock.unlock();
transactionLatch.countDown();
// transactionLock.unlock();
}
}

@Override
public void onError(SpannerException e) {
public void onError(SpannerException e, boolean withBeginTransaction) {
// Release the transactionLock if that is being held by this thread. That would mean that the
// statement that was trying to start a transaction caused an error. The next statement should
// in that case also include a BeginTransaction option.
if (transactionLock.isHeldByCurrentThread()) {
transactionLock.unlock();

// if (transactionLock.isHeldByCurrentThread()) {
// transactionLock.unlock();
// }
if (withBeginTransaction) {
transactionLatch.countDown();
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
long delay = -1L;
if (e instanceof AbortedException) {
Expand Down Expand Up @@ -494,7 +519,7 @@ public long executeUpdate(Statement statement) {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (SpannerException e) {
onError(e);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
}
}
Expand All @@ -504,7 +529,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
// commit.
Expand Down Expand Up @@ -538,7 +563,7 @@ public Long apply(ResultSet input) {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
onError(e);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
}
},
Expand All @@ -547,6 +572,14 @@ public Long apply(Throwable input) {
new Runnable() {
@Override
public void run() {
try {
if (resultSet.get().getMetadata().hasTransaction()) {
onTransactionMetadata(resultSet.get().getMetadata().getTransaction());
}
} catch (ExecutionException | InterruptedException e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
}
decreaseAsyncOperations();
}
},
Expand Down Expand Up @@ -582,7 +615,7 @@ public long[] batchUpdate(Iterable<Statement> statements) {
}
return results;
} catch (SpannerException e) {
onError(e);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
}
}
Expand Down Expand Up @@ -610,6 +643,9 @@ public long[] apply(ExecuteBatchDmlResponse input) {
long[] results = new long[input.getResultSetsCount()];
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(input.getResultSets(i).getMetadata().getTransaction());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
Expand All @@ -633,9 +669,13 @@ public void run() {
try {
updateCounts.get();
} catch (ExecutionException e) {
onError(SpannerExceptionFactory.newSpannerException(e.getCause()));
onError(
SpannerExceptionFactory.newSpannerException(e.getCause()),
builder.hasTransaction() && builder.getTransaction().hasBegin());
} catch (InterruptedException e) {
onError(SpannerExceptionFactory.propagateInterrupt(e));
onError(
SpannerExceptionFactory.propagateInterrupt(e),
builder.hasTransaction() && builder.getTransaction().hasBegin());
} finally {
decreaseAsyncOperations();
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ private static class NoOpListener implements AbstractResultSet.Listener {
public void onTransactionMetadata(Transaction transaction) throws SpannerException {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
Expand All @@ -76,11 +76,11 @@ public void cancel(@Nullable String message) {}
public void request(int numMessages) {}
});
consumer = stream.consumer();
resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener(), false);
}

public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) {
return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener(), false);
}

@Test
Expand Down Expand Up @@ -641,7 +641,7 @@ public com.google.protobuf.Value apply(@Nullable Value input) {

private void verifySerialization(
Function<Value, com.google.protobuf.Value> protoFn, Value... values) {
resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener(), false);
PartialResultSet.Builder builder = PartialResultSet.newBuilder();
List<Type.StructField> types = new ArrayList<>();
for (Value value : values) {
Expand Down

0 comments on commit 436833b

Please sign in to comment.