Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry pdml transaction on EOS internal error #360

Merged
merged 8 commits into from Jul 26, 2020
@@ -0,0 +1,37 @@
package com.google.cloud.spanner;

import com.google.api.gax.rpc.InternalException;
import com.google.common.base.Predicate;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;

public class IsRetryableInternalError implements Predicate<Throwable> {

private static final String HTTP2_ERROR_MESSAGE = "HTTP/2 error code: INTERNAL_ERROR";
private static final String CONNECTION_CLOSED_ERROR_MESSAGE = "Connection closed with unknown cause";
private static final String EOS_ERROR_MESSAGE = "Received unexpected EOS on DATA frame from server";

@Override
public boolean apply(@NullableDecl Throwable cause) {
if (isInternalError(cause)) {
if (cause.getMessage().contains(HTTP2_ERROR_MESSAGE)) {
// See b/25451313.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below, if we could remove the references to internal issues that would be great. I don't know why they were there before. Folks working on this repo are unlikely to have access to those so no point exposing them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, will tackle this in a following PR.

return true;
} else if (cause.getMessage().contains(CONNECTION_CLOSED_ERROR_MESSAGE)) {
// See b/27794742.
return true;
} else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) {
return true;
}
}
return false;
}

private boolean isInternalError(Throwable cause) {
return (cause instanceof InternalException)
|| (cause instanceof StatusRuntimeException
&& ((StatusRuntimeException) cause).getStatus().getCode()
== Status.Code.INTERNAL);
}
}
@@ -0,0 +1,13 @@
package com.google.cloud.spanner;

import com.google.common.base.Predicate;
import javax.net.ssl.SSLHandshakeException;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;

public class IsSslHandshakeException implements Predicate<Throwable> {

@Override
public boolean apply(@NullableDecl Throwable input) {
return input instanceof SSLHandshakeException;
}
}
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -43,17 +44,20 @@
import org.threeten.bp.temporal.ChronoUnit;

public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction {

private static final Logger LOGGER = Logger.getLogger(PartitionedDmlTransaction.class.getName());

private final SessionImpl session;
private final SpannerRpc rpc;
private final Ticker ticker;
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
private volatile boolean isValid = true;

PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
this.session = session;
this.rpc = rpc;
this.ticker = ticker;
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
}

/**
Expand Down Expand Up @@ -95,6 +99,14 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
request = resumeOrRestartRequest(resumeToken, statement, request);
} catch (InternalException e) {
if (!isRetryableInternalErrorPredicate.apply(e)) {
throw e;
}

LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
request = resumeOrRestartRequest(resumeToken, statement, request);
} catch (AbortedException e) {
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
resumeToken = ByteString.EMPTY;
Expand Down Expand Up @@ -122,7 +134,8 @@ public void invalidate() {

// No-op method needed to implement SessionTransaction interface.
@Override
public void setSpan(Span span) {}
public void setSpan(Span span) {
}

private Duration tryUpdateTimeout(final Duration timeout, final Stopwatch stopwatch) {
final Duration remainingTimeout =
Expand Down
Expand Up @@ -26,12 +26,10 @@
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;

/**
* A factory for creating instances of {@link SpannerException} and its subtypes. All creation of
Expand All @@ -40,6 +38,7 @@
* ErrorCode#ABORTED} are always represented by {@link AbortedException}.
*/
public final class SpannerExceptionFactory {

static final String SESSION_RESOURCE_TYPE = "type.googleapis.com/google.spanner.v1.Session";
static final String DATABASE_RESOURCE_TYPE =
"type.googleapis.com/google.spanner.admin.database.v1.Database";
Expand Down Expand Up @@ -257,35 +256,8 @@ private static boolean hasCauseMatching(
}

private static class Matchers {
static final Predicate<Throwable> isRetryableInternalError =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted these predicates into their own classes that are now exposed to the PartitionedDmlTransaction. This was the cleanest / lowest impact way I could find to de-duplicate the EOS retryable logic.

new Predicate<Throwable>() {
@Override
public boolean apply(Throwable cause) {
if (cause instanceof StatusRuntimeException
&& ((StatusRuntimeException) cause).getStatus().getCode() == Status.Code.INTERNAL) {
if (cause.getMessage().contains("HTTP/2 error code: INTERNAL_ERROR")) {
// See b/25451313.
return true;
}
if (cause.getMessage().contains("Connection closed with unknown cause")) {
// See b/27794742.
return true;
}
if (cause
.getMessage()
.contains("Received unexpected EOS on DATA frame from server")) {
return true;
}
}
return false;
}
};
static final Predicate<Throwable> isSSLHandshakeException =
new Predicate<Throwable>() {
@Override
public boolean apply(Throwable input) {
return input instanceof SSLHandshakeException;
}
};

static final Predicate<Throwable> isRetryableInternalError = new IsRetryableInternalError();
static final Predicate<Throwable> isSSLHandshakeException = new IsSslHandshakeException();
}
}
@@ -0,0 +1,128 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InternalException;
import com.google.common.base.Predicate;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@SuppressWarnings("unchecked")
@RunWith(JUnit4.class)
public class IsRetryableInternalErrorTest {

private Predicate<Throwable> predicate;

@Before
public void setUp() {
predicate = new IsRetryableInternalError();
}

@Test
public void http2ErrorStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e = new StatusRuntimeException(
Status
.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: HTTP/2 error code: INTERNAL_ERROR.")
);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void http2ErrorInternalExceptionIsRetryable() {
final InternalException e = new InternalException(
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false
);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void connectionClosedStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e = new StatusRuntimeException(
Status
.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: Connection closed with unknown cause.")
);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void connectionClosedInternalExceptionIsRetryable() {
final InternalException e = new InternalException(
"INTERNAL: Connection closed with unknown cause.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false
);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void eosStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e = new StatusRuntimeException(
Status
.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: Received unexpected EOS on DATA frame from server.")
);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void eosInternalExceptionIsRetryable() {
final InternalException e = new InternalException(
"INTERNAL: Received unexpected EOS on DATA frame from server.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false
);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void genericInternalStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e = new StatusRuntimeException(
Status
.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: Generic.")
);

assertThat(predicate.apply(e)).isFalse();
}

@Test
public void genericInternalExceptionIsNotRetryable() {
final InternalException e = new InternalException(
"INTERNAL: Generic.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false
);

assertThat(predicate.apply(e)).isFalse();
}

@Test
public void nullIsNotRetryable() {
assertThat(predicate.apply(null)).isFalse();
}

@Test
public void genericExceptionIsNotRetryable() {
assertThat(predicate.apply(new Exception())).isFalse();
}
}
@@ -0,0 +1,42 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InternalException;
import com.google.common.base.Predicate;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import javax.net.ssl.SSLHandshakeException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@SuppressWarnings("unchecked")
@RunWith(JUnit4.class)
public class IsSslHandshakeExceptionTest {

private Predicate<Throwable> predicate;

@Before
public void setUp() {
predicate = new IsSslHandshakeException();
}

@Test
public void sslHandshakeExceptionIsTrue() {
assertThat(predicate.apply(new SSLHandshakeException("test"))).isTrue();
}

@Test
public void genericExceptionIsNotSslHandshakeException() {
assertThat(predicate.apply(new Exception("test"))).isFalse();
}

@Test
public void nullIsNotSslHandshakeException() {
assertThat(predicate.apply(null)).isFalse();
}
}