Skip to content

Commit

Permalink
fix(3.1.x): UNAVAILABLE error on first query could cause transaction …
Browse files Browse the repository at this point in the history
…to get stuck (#856)

* fix: UNAVAILABLE error on first query could cause transaction to get stuck (#807)

If the first query or read operation of a read/write transaction would return UNAVAILABLE for
the first element of the result stream, the transaction could get stuck. This was caused by the
internal retry mechanism that would wait for the initial attempt to return a transaction, which
was never returned as the UNAVAILABLE exception was internally handled by the result stream
iterator.

Fixes #799

* chore: re-formats source files

To fix lint errors

* fix: removes unrelated changes

Co-authored-by: Knut Olav Løite <koloite@gmail.com>
  • Loading branch information
thiagotnunes and olavloite committed Feb 9, 2021
1 parent 8626a85 commit a346eb3
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 13 deletions.
Expand Up @@ -554,7 +554,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, boolean withTransactionSelector) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
Expand All @@ -568,9 +569,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
if (withTransactionSelector) {
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
Expand Down Expand Up @@ -614,18 +617,25 @@ ResultSet executeQueryInternalWithOptions(
beforeReadOrQuery();
final int prefetchChunks =
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
TransactionSelector selector = null;
if (resumeToken != null) {
request.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!request.hasTransaction()) {
selector = getTransactionSelector();
}
if (selector != null) {
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
Expand Down Expand Up @@ -733,10 +743,13 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!builder.hasTransaction()) {
selector = getTransactionSelector();
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
Expand Down
Expand Up @@ -515,7 +515,8 @@ public void buffer(Iterable<Mutation> mutations) {
public long executeUpdate(Statement statement) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, /* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
Expand All @@ -538,7 +539,8 @@ public long executeUpdate(Statement statement) {
public ApiFuture<Long> executeUpdateAsync(Statement statement) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, /* withTransactionSelector = */ true);
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down
Expand Up @@ -89,7 +89,8 @@ public void setup() {
public void executeSqlRequestBuilderWithoutQueryOptions() {
ExecuteSqlRequest request =
context
.getExecuteSqlRequestBuilder(Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL)
.getExecuteSqlRequestBuilder(
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
Expand All @@ -103,7 +104,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
Statement.newBuilder("SELECT FOO FROM BAR")
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
.build(),
QueryMode.NORMAL)
QueryMode.NORMAL,
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");
Expand Down
Expand Up @@ -251,6 +251,130 @@ public Long run(TransactionContext transaction) throws Exception {
assertThat(countTransactionsStarted()).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstUpdateAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
UPDATE_STATEMENT,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
}
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
SELECT1,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.query(SELECT1, SELECT1_RESULTSET));
}
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(updateCount).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginTxWithQuery() {
DatabaseClient client =
Expand Down Expand Up @@ -279,8 +403,7 @@ public Long run(TransactionContext transaction) throws Exception {

@Test
public void testInlinedBeginTxWithRead() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
Expand Down

0 comments on commit a346eb3

Please sign in to comment.