Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry pdml transaction on EOS internal error #360

Merged
merged 8 commits into from Jul 26, 2020
Expand Up @@ -20,6 +20,7 @@

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
Expand Down Expand Up @@ -55,23 +56,6 @@ class PartitionedDMLTransaction implements SessionTransaction {
this.rpc = rpc;
}

private ByteString initTransaction() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was just moved down (after the public methods).

final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.build();
Transaction txn = rpc.beginTransaction(request, session.getOptions());
if (txn.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
"Failed to init transaction, missing transaction id\n" + session.getName());
}
return txn.getId();
}

/**
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the
Expand Down Expand Up @@ -127,20 +111,24 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration
}
}
break;
} catch (UnavailableException e) {
} catch (UnavailableException | InternalException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this catch block we retry on EOS exception.

// Retry the stream in the same transaction if the stream breaks with
// UnavailableException and we have a resume token. Otherwise, we just retry the
// entire transaction.
if (!ByteString.EMPTY.equals(resumeToken)) {
log.log(
Level.FINER,
"Retrying PartitionedDml stream using resume token '"
+ resumeToken.toStringUtf8()
+ "' because of broken stream",
e);
if (shouldResumeOrRestartTransaction(e)) {
if (!ByteString.EMPTY.equals(resumeToken)) {
log.log(
Level.FINER,
"Retrying PartitionedDml stream using resume token '"
+ resumeToken.toStringUtf8()
+ "' because of broken stream",
e);
} else {
throw new com.google.api.gax.rpc.AbortedException(
e, GrpcStatusCode.of(Code.ABORTED), true);
}
} else {
throw new com.google.api.gax.rpc.AbortedException(
e, GrpcStatusCode.of(Code.ABORTED), true);
throw e;
}
}
}
Expand Down Expand Up @@ -174,4 +162,27 @@ public void invalidate() {
// No-op method needed to implement SessionTransaction interface.
@Override
public void setSpan(Span span) {}

private ByteString initTransaction() {
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.build();
Transaction txn = rpc.beginTransaction(request, session.getOptions());
if (txn.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
"Failed to init transaction, missing transaction id\n" + session.getName());
}
return txn.getId();
}

private boolean shouldResumeOrRestartTransaction(Exception e) {
return e instanceof UnavailableException
|| (e instanceof InternalException
&& e.getMessage().contains("Received unexpected EOS on DATA frame from server"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hacky, but unfortunately we do not get a specific exception for this error, so we have to proxy through the exception message. This follows the approach taken in the bigquery client: https://github.com/googleapis/java-bigquerystorage/pull/263/files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same error is also retried for streaming queries, and this check could probably use the existing check in SpannerExceptionFactory:

private static boolean isRetryable(ErrorCode code, @Nullable Throwable cause) {
. It's still just as hacky, but it would keep it confined to one method.

}
}
Expand Up @@ -20,27 +20,20 @@
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.*;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetStats;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status.Code;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -295,4 +288,72 @@ public Long answer(InvocationOnMock invocation) throws Throwable {
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
}
}

@Test
public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build();
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build();
ServerStream<PartialResultSet> stream1 = mock(ServerStream.class);
Iterator<PartialResultSet> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true, true, false);
when(iterator.next())
.thenReturn(p1)
.thenThrow(
new InternalException(
"INTERNAL: Received unexpected EOS on DATA frame from server.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false));
when(stream1.iterator()).thenReturn(iterator);
ServerStream<PartialResultSet> stream2 = mock(ServerStream.class);
when(stream2.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator());
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDMLTransaction tx = new PartitionedDMLTransaction(session, rpc);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap());
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
}

@Test
public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
ServerStream<PartialResultSet> stream1 = mock(ServerStream.class);
Iterator<PartialResultSet> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true, true, false);
when(iterator.next())
.thenReturn(p1)
.thenThrow(
new InternalException(
"INTERNAL: Error", null, GrpcStatusCode.of(Code.INTERNAL), false));
when(stream1.iterator()).thenReturn(iterator);
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);

try {
PartitionedDMLTransaction tx = new PartitionedDMLTransaction(session, rpc);
tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));
fail("missing expected INTERNAL exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap());
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
}
}
}