Skip to content

Commit

Permalink
fix: safeguard against statements errors when requesting tx (#800)
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Jan 14, 2021
1 parent 5b864a1 commit c4776e4
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 72 deletions.
Expand Up @@ -213,7 +213,7 @@ TransactionSelector getTransactionSelector() {
}

@Override
public void onTransactionMetadata(Transaction transaction) {
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
synchronized (lock) {
if (!transaction.hasReadTimestamp()) {
throw newSpannerException(
Expand Down Expand Up @@ -394,6 +394,9 @@ void initTransaction() {
// much more frequently.
private static final int MAX_BUFFERED_CHUNKS = 512;

protected static final String NO_TRANSACTION_RETURNED_MSG =
"The statement did not return a transaction even though one was requested";

AbstractReadContext(Builder<?, ?> builder) {
this.session = builder.session;
this.rpc = builder.rpc;
Expand Down Expand Up @@ -632,7 +635,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
call.request(prefetchChunks);
stream.setCall(call, request.hasTransaction() && request.getTransaction().hasBegin());
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
}
};
Expand Down Expand Up @@ -685,7 +688,7 @@ public void close() {

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction) {}
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
Expand Down
Expand Up @@ -78,7 +78,8 @@ interface Listener {
* Called when transaction metadata is seen. This method may be invoked at most once. If the
* method is invoked, it will precede {@link #onError(SpannerException)} or {@link #onDone()}.
*/
void onTransactionMetadata(Transaction transaction) throws SpannerException;
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e, boolean withBeginTransaction);
Expand Down Expand Up @@ -117,12 +118,12 @@ public boolean next() throws SpannerException {
if (currRow == null) {
ResultSetMetadata metadata = iterator.getMetadata();
if (metadata.hasTransaction()) {
listener.onTransactionMetadata(metadata.getTransaction());
listener.onTransactionMetadata(
metadata.getTransaction(), iterator.isWithBeginTransaction());
} else if (iterator.isWithBeginTransaction()) {
// The query should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Query requested a transaction to be started, but no transaction was returned");
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
Expand Down
Expand Up @@ -463,13 +463,9 @@ TransactionSelector getTransactionSelector() {
// Aborted error if the call that included the BeginTransaction option fails. The
// Aborted error will cause the entire transaction to be retried, and the retry will use
// a separate BeginTransaction RPC.
if (trackTransactionStarter) {
TransactionSelector.newBuilder()
.setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS))
.build();
} else {
TransactionSelector.newBuilder().setId(tx.get()).build();
}
TransactionSelector.newBuilder()
.setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS))
.build();
}
} catch (ExecutionException e) {
if (e.getCause() instanceof AbortedException) {
Expand All @@ -479,11 +475,15 @@ TransactionSelector getTransactionSelector() {
}
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (TimeoutException e) {
// Throw an ABORTED exception to force a retry of the transaction if no transaction
// has been returned by the first statement.
SpannerException se =
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timeout while waiting for a transaction to be returned by another statement. "
+ "See the suppressed exception for the stacktrace of the caller that should return a transaction",
ErrorCode.ABORTED,
"Timeout while waiting for a transaction to be returned by another statement."
+ (trackTransactionStarter
? " See the suppressed exception for the stacktrace of the caller that should return a transaction"
: ""),
e);
if (transactionStarter != null) {
se.addSuppressed(transactionStarter);
Expand All @@ -498,12 +498,20 @@ TransactionSelector getTransactionSelector() {
}

@Override
public void onTransactionMetadata(Transaction transaction) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if (this.transactionId == null && transaction != null && transaction.getId() != null) {
this.transactionId = transaction.getId();
this.transactionIdFuture.set(transaction.getId());
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
Preconditions.checkNotNull(transaction);
if (transaction.getId() != ByteString.EMPTY) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if ((transactionIdFuture == null || !this.transactionIdFuture.isDone())
&& this.transactionId == null) {
this.transactionId = transaction.getId();
this.transactionIdFuture.set(transaction.getId());
}
} else if (shouldIncludeId) {
// The statement should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
}
}

Expand Down Expand Up @@ -580,17 +588,18 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(resultSet.getMetadata().getTransaction());
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
}
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (SpannerException e) {
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
} catch (Throwable t) {
onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
throw t;
}
}

Expand Down Expand Up @@ -621,6 +630,12 @@ public Long apply(ResultSet input) {
ErrorCode.INVALID_ARGUMENT,
"DML response missing stats possibly due to non-DML statement as input");
}
if (builder.getTransaction().hasBegin()
&& !(input.getMetadata().hasTransaction()
&& input.getMetadata().getTransaction().getId() != ByteString.EMPTY)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG);
}
// For standard DML, using the exact row count.
return input.getStats().getRowCountExact();
}
Expand All @@ -633,8 +648,8 @@ public Long apply(ResultSet input) {
new ApiFunction<Throwable, Long>() {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
}
},
Expand All @@ -645,9 +660,11 @@ public Long apply(Throwable input) {
public void run() {
try {
if (resultSet.get().getMetadata().hasTransaction()) {
onTransactionMetadata(resultSet.get().getMetadata().getTransaction());
onTransactionMetadata(
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
} catch (ExecutionException | InterruptedException e) {
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
}
Expand All @@ -670,7 +687,9 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
if (response.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(response.getResultSets(i).getMetadata().getTransaction());
onTransactionMetadata(
response.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}

Expand All @@ -686,8 +705,8 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
results);
}
return results;
} catch (SpannerException e) {
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
} catch (Throwable e) {
onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
throw e;
}
}
Expand Down Expand Up @@ -718,7 +737,9 @@ public long[] apply(ExecuteBatchDmlResponse input) {
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(input.getResultSets(i).getMetadata().getTransaction());
onTransactionMetadata(
input.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
Expand All @@ -743,10 +764,8 @@ public long[] apply(ExecuteBatchDmlResponse input) {
new ApiFunction<Throwable, long[]>() {
@Override
public long[] apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
onError(
SpannerExceptionFactory.newSpannerException(e.getCause()),
builder.hasTransaction() && builder.getTransaction().hasBegin());
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
}
},
Expand Down
Expand Up @@ -56,7 +56,8 @@ public class GrpcResultSetTest {

private static class NoOpListener implements AbstractResultSet.Listener {
@Override
public void onTransactionMetadata(Transaction transaction) throws SpannerException {}
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
Expand Down

0 comments on commit c4776e4

Please sign in to comment.