Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: safeguard against statement errors when requesting a transaction #800

Merged
merged 1 commit into from Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -213,7 +213,7 @@ TransactionSelector getTransactionSelector() {
}

@Override
public void onTransactionMetadata(Transaction transaction) {
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
synchronized (lock) {
if (!transaction.hasReadTimestamp()) {
throw newSpannerException(
Expand Down Expand Up @@ -394,6 +394,9 @@ void initTransaction() {
// much more frequently.
private static final int MAX_BUFFERED_CHUNKS = 512;

protected static final String NO_TRANSACTION_RETURNED_MSG =
"The statement did not return a transaction even though one was requested";

AbstractReadContext(Builder<?, ?> builder) {
this.session = builder.session;
this.rpc = builder.rpc;
Expand Down Expand Up @@ -632,7 +635,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
call.request(prefetchChunks);
stream.setCall(call, request.hasTransaction() && request.getTransaction().hasBegin());
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
}
};
Expand Down Expand Up @@ -685,7 +688,7 @@ public void close() {

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction) {}
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
Expand Down
Expand Up @@ -78,7 +78,8 @@ interface Listener {
* Called when transaction metadata is seen. This method may be invoked at most once. If the
* method is invoked, it will precede {@link #onError(SpannerException)} or {@link #onDone()}.
*/
void onTransactionMetadata(Transaction transaction) throws SpannerException;
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e, boolean withBeginTransaction);
Expand Down Expand Up @@ -117,12 +118,12 @@ public boolean next() throws SpannerException {
if (currRow == null) {
ResultSetMetadata metadata = iterator.getMetadata();
if (metadata.hasTransaction()) {
listener.onTransactionMetadata(metadata.getTransaction());
listener.onTransactionMetadata(
metadata.getTransaction(), iterator.isWithBeginTransaction());
} else if (iterator.isWithBeginTransaction()) {
// The query should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Query requested a transaction to be started, but no transaction was returned");
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
Expand Down
Expand Up @@ -463,13 +463,9 @@ TransactionSelector getTransactionSelector() {
// Aborted error if the call that included the BeginTransaction option fails. The
// Aborted error will cause the entire transaction to be retried, and the retry will use
// a separate BeginTransaction RPC.
if (trackTransactionStarter) {
TransactionSelector.newBuilder()
.setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS))
.build();
} else {
TransactionSelector.newBuilder().setId(tx.get()).build();
}
TransactionSelector.newBuilder()
.setId(tx.get(waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS))
.build();
}
} catch (ExecutionException e) {
if (e.getCause() instanceof AbortedException) {
Expand All @@ -479,11 +475,15 @@ TransactionSelector getTransactionSelector() {
}
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (TimeoutException e) {
// Throw an ABORTED exception to force a retry of the transaction if no transaction
// has been returned by the first statement.
SpannerException se =
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timeout while waiting for a transaction to be returned by another statement. "
+ "See the suppressed exception for the stacktrace of the caller that should return a transaction",
ErrorCode.ABORTED,
"Timeout while waiting for a transaction to be returned by another statement."
+ (trackTransactionStarter
? " See the suppressed exception for the stacktrace of the caller that should return a transaction"
: ""),
e);
if (transactionStarter != null) {
se.addSuppressed(transactionStarter);
Expand All @@ -498,12 +498,20 @@ TransactionSelector getTransactionSelector() {
}

@Override
public void onTransactionMetadata(Transaction transaction) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if (this.transactionId == null && transaction != null && transaction.getId() != null) {
this.transactionId = transaction.getId();
this.transactionIdFuture.set(transaction.getId());
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
Preconditions.checkNotNull(transaction);
if (transaction.getId() != ByteString.EMPTY) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if ((transactionIdFuture == null || !this.transactionIdFuture.isDone())
&& this.transactionId == null) {
this.transactionId = transaction.getId();
this.transactionIdFuture.set(transaction.getId());
}
} else if (shouldIncludeId) {
// The statement should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
}
}

Expand Down Expand Up @@ -580,17 +588,18 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(resultSet.getMetadata().getTransaction());
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
}
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (SpannerException e) {
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
throw e;
} catch (Throwable t) {
onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
throw t;
}
}

Expand Down Expand Up @@ -621,6 +630,12 @@ public Long apply(ResultSet input) {
ErrorCode.INVALID_ARGUMENT,
"DML response missing stats possibly due to non-DML statement as input");
}
if (builder.getTransaction().hasBegin()
&& !(input.getMetadata().hasTransaction()
&& input.getMetadata().getTransaction().getId() != ByteString.EMPTY)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG);
}
// For standard DML, using the exact row count.
return input.getStats().getRowCountExact();
}
Expand All @@ -633,8 +648,8 @@ public Long apply(ResultSet input) {
new ApiFunction<Throwable, Long>() {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
}
},
Expand All @@ -645,9 +660,11 @@ public Long apply(Throwable input) {
public void run() {
try {
if (resultSet.get().getMetadata().hasTransaction()) {
onTransactionMetadata(resultSet.get().getMetadata().getTransaction());
onTransactionMetadata(
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
} catch (ExecutionException | InterruptedException e) {
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
}
Expand All @@ -670,7 +687,9 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
if (response.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(response.getResultSets(i).getMetadata().getTransaction());
onTransactionMetadata(
response.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}

Expand All @@ -686,8 +705,8 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
results);
}
return results;
} catch (SpannerException e) {
onError(e, builder.hasTransaction() && builder.getTransaction().hasBegin());
} catch (Throwable e) {
onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
throw e;
}
}
Expand Down Expand Up @@ -718,7 +737,9 @@ public long[] apply(ExecuteBatchDmlResponse input) {
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(input.getResultSets(i).getMetadata().getTransaction());
onTransactionMetadata(
input.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
Expand All @@ -743,10 +764,8 @@ public long[] apply(ExecuteBatchDmlResponse input) {
new ApiFunction<Throwable, long[]>() {
@Override
public long[] apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.newSpannerException(input);
onError(
SpannerExceptionFactory.newSpannerException(e.getCause()),
builder.hasTransaction() && builder.getTransaction().hasBegin());
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
}
},
Expand Down
Expand Up @@ -56,7 +56,8 @@ public class GrpcResultSetTest {

private static class NoOpListener implements AbstractResultSet.Listener {
@Override
public void onTransactionMetadata(Transaction transaction) throws SpannerException {}
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
Expand Down