Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: UNAVAILABLE error on first query could cause transaction to get stuck #807

Merged
merged 1 commit into from Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -555,7 +555,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options) {
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
Expand All @@ -569,9 +569,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
if (withTransactionSelector) {
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
Expand Down Expand Up @@ -616,18 +618,26 @@ ResultSet executeQueryInternalWithOptions(
beforeReadOrQuery();
final int prefetchChunks =
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(
statement, queryMode, options, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode, options);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
TransactionSelector selector = null;
if (resumeToken != null) {
request.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!request.hasTransaction()) {
selector = getTransactionSelector();
}
if (selector != null) {
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
Expand Down Expand Up @@ -735,10 +745,13 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!builder.hasTransaction()) {
selector = getTransactionSelector();
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
Expand Down
Expand Up @@ -1079,6 +1079,7 @@ protected PartialResultSet computeNext() {
backoffSleep(context, backOff);
}
}

continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
Expand Down
Expand Up @@ -575,7 +575,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
Expand All @@ -599,7 +602,10 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down
Expand Up @@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() {
ExecuteSqlRequest request =
context
.getExecuteSqlRequestBuilder(
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions())
Statement.of("SELECT FOO FROM BAR"),
QueryMode.NORMAL,
Options.fromQueryOptions(),
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
Expand All @@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
.build(),
QueryMode.NORMAL,
Options.fromQueryOptions())
Options.fromQueryOptions(),
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");
Expand Down
Expand Up @@ -255,6 +255,130 @@ public Long run(TransactionContext transaction) throws Exception {
assertThat(countTransactionsStarted()).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstUpdateAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
UPDATE_STATEMENT,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
}
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
SELECT1,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.query(SELECT1, SELECT1_RESULTSET));
}
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(updateCount).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginTxWithQuery() {
DatabaseClient client =
Expand Down Expand Up @@ -283,8 +407,7 @@ public Long run(TransactionContext transaction) throws Exception {

@Test
public void testInlinedBeginTxWithRead() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
Expand Down