Skip to content

Commit

Permalink
fix: UNAVAILABLE error on first query could cause transaction to get …
Browse files Browse the repository at this point in the history
…stuck (#807)

If the first query or read operation of a read/write transaction would return UNAVAILABLE for
the first element of the result stream, the transaction could get stuck. This was caused by the
internal retry mechanism that would wait for the initial attempt to return a transaction, which
was never returned as the UNAVAILABLE exception was internally handled by the result stream
iterator.

Fixes #799
  • Loading branch information
olavloite committed Jan 17, 2021
1 parent 557e761 commit c7dc6e6
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 13 deletions.
Expand Up @@ -558,7 +558,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options) {
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
Expand All @@ -572,9 +572,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
if (withTransactionSelector) {
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
Expand Down Expand Up @@ -619,18 +621,26 @@ ResultSet executeQueryInternalWithOptions(
beforeReadOrQuery();
final int prefetchChunks =
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(
statement, queryMode, options, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode, options);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
TransactionSelector selector = null;
if (resumeToken != null) {
request.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!request.hasTransaction()) {
selector = getTransactionSelector();
}
if (selector != null) {
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
Expand Down Expand Up @@ -738,10 +748,13 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!builder.hasTransaction()) {
selector = getTransactionSelector();
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
Expand Down
Expand Up @@ -1080,6 +1080,7 @@ protected PartialResultSet computeNext() {
backoffSleep(context, backOff);
}
}

continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
Expand Down
Expand Up @@ -583,7 +583,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
Expand All @@ -608,7 +611,10 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down
Expand Up @@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() {
ExecuteSqlRequest request =
context
.getExecuteSqlRequestBuilder(
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions())
Statement.of("SELECT FOO FROM BAR"),
QueryMode.NORMAL,
Options.fromQueryOptions(),
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
Expand All @@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
.build(),
QueryMode.NORMAL,
Options.fromQueryOptions())
Options.fromQueryOptions(),
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");
Expand Down
Expand Up @@ -257,6 +257,130 @@ public Long run(TransactionContext transaction) throws Exception {
assertThat(countTransactionsStarted()).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstUpdateAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
UPDATE_STATEMENT,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
}
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
SELECT1,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.query(SELECT1, SELECT1_RESULTSET));
}
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(updateCount).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginTxWithQuery() {
DatabaseClient client =
Expand Down Expand Up @@ -285,8 +409,7 @@ public Long run(TransactionContext transaction) throws Exception {

@Test
public void testInlinedBeginTxWithRead() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
Expand Down

0 comments on commit c7dc6e6

Please sign in to comment.