Skip to content

Commit

Permalink
fix: retry pdml transaction on EOS internal error (#360)
Browse files Browse the repository at this point in the history
* fix: retries PDML transactions on EOS errors

It is possible to have the stream closed with an EOS internal error.
This should be retried by the client.
  • Loading branch information
thiagotnunes committed Jul 26, 2020
1 parent 6125c7d commit a53d736
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 43 deletions.
@@ -0,0 +1,53 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.rpc.InternalException;
import com.google.common.base.Predicate;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

public class IsRetryableInternalError implements Predicate<Throwable> {

private static final String HTTP2_ERROR_MESSAGE = "HTTP/2 error code: INTERNAL_ERROR";
private static final String CONNECTION_CLOSED_ERROR_MESSAGE =
"Connection closed with unknown cause";
private static final String EOS_ERROR_MESSAGE =
"Received unexpected EOS on DATA frame from server";

@Override
public boolean apply(Throwable cause) {
if (isInternalError(cause)) {
if (cause.getMessage().contains(HTTP2_ERROR_MESSAGE)) {
// See b/25451313.
return true;
} else if (cause.getMessage().contains(CONNECTION_CLOSED_ERROR_MESSAGE)) {
// See b/27794742.
return true;
} else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) {
return true;
}
}
return false;
}

private boolean isInternalError(Throwable cause) {
return (cause instanceof InternalException)
|| (cause instanceof StatusRuntimeException
&& ((StatusRuntimeException) cause).getStatus().getCode() == Status.Code.INTERNAL);
}
}
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.common.base.Predicate;
import javax.net.ssl.SSLHandshakeException;

public class IsSslHandshakeException implements Predicate<Throwable> {

@Override
public boolean apply(Throwable input) {
return input instanceof SSLHandshakeException;
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Google LLC
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -43,17 +44,20 @@
import org.threeten.bp.temporal.ChronoUnit;

public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction {

private static final Logger LOGGER = Logger.getLogger(PartitionedDmlTransaction.class.getName());

private final SessionImpl session;
private final SpannerRpc rpc;
private final Ticker ticker;
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
private volatile boolean isValid = true;

PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
this.session = session;
this.rpc = rpc;
this.ticker = ticker;
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
}

/**
Expand Down Expand Up @@ -95,6 +99,14 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
request = resumeOrRestartRequest(resumeToken, statement, request);
} catch (InternalException e) {
if (!isRetryableInternalErrorPredicate.apply(e)) {
throw e;
}

LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
request = resumeOrRestartRequest(resumeToken, statement, request);
} catch (AbortedException e) {
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
resumeToken = ByteString.EMPTY;
Expand Down
Expand Up @@ -26,12 +26,10 @@
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;

/**
* A factory for creating instances of {@link SpannerException} and its subtypes. All creation of
Expand All @@ -40,6 +38,7 @@
* ErrorCode#ABORTED} are always represented by {@link AbortedException}.
*/
public final class SpannerExceptionFactory {

static final String SESSION_RESOURCE_TYPE = "type.googleapis.com/google.spanner.v1.Session";
static final String DATABASE_RESOURCE_TYPE =
"type.googleapis.com/google.spanner.admin.database.v1.Database";
Expand Down Expand Up @@ -257,35 +256,8 @@ private static boolean hasCauseMatching(
}

private static class Matchers {
static final Predicate<Throwable> isRetryableInternalError =
new Predicate<Throwable>() {
@Override
public boolean apply(Throwable cause) {
if (cause instanceof StatusRuntimeException
&& ((StatusRuntimeException) cause).getStatus().getCode() == Status.Code.INTERNAL) {
if (cause.getMessage().contains("HTTP/2 error code: INTERNAL_ERROR")) {
// See b/25451313.
return true;
}
if (cause.getMessage().contains("Connection closed with unknown cause")) {
// See b/27794742.
return true;
}
if (cause
.getMessage()
.contains("Received unexpected EOS on DATA frame from server")) {
return true;
}
}
return false;
}
};
static final Predicate<Throwable> isSSLHandshakeException =
new Predicate<Throwable>() {
@Override
public boolean apply(Throwable input) {
return input instanceof SSLHandshakeException;
}
};

static final Predicate<Throwable> isRetryableInternalError = new IsRetryableInternalError();
static final Predicate<Throwable> isSSLHandshakeException = new IsSslHandshakeException();
}
}
@@ -0,0 +1,135 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InternalException;
import com.google.common.base.Predicate;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@SuppressWarnings("unchecked")
@RunWith(JUnit4.class)
public class IsRetryableInternalErrorTest {

private Predicate<Throwable> predicate;

@Before
public void setUp() {
predicate = new IsRetryableInternalError();
}

@Test
public void http2ErrorStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e =
new StatusRuntimeException(
Status.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: HTTP/2 error code: INTERNAL_ERROR."));

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void http2ErrorInternalExceptionIsRetryable() {
final InternalException e =
new InternalException(
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void connectionClosedStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e =
new StatusRuntimeException(
Status.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: Connection closed with unknown cause."));

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void connectionClosedInternalExceptionIsRetryable() {
final InternalException e =
new InternalException(
"INTERNAL: Connection closed with unknown cause.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void eosStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e =
new StatusRuntimeException(
Status.fromCode(Code.INTERNAL)
.withDescription("INTERNAL: Received unexpected EOS on DATA frame from server."));

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void eosInternalExceptionIsRetryable() {
final InternalException e =
new InternalException(
"INTERNAL: Received unexpected EOS on DATA frame from server.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false);

assertThat(predicate.apply(e)).isTrue();
}

@Test
public void genericInternalStatusRuntimeExceptionIsRetryable() {
final StatusRuntimeException e =
new StatusRuntimeException(
Status.fromCode(Code.INTERNAL).withDescription("INTERNAL: Generic."));

assertThat(predicate.apply(e)).isFalse();
}

@Test
public void genericInternalExceptionIsNotRetryable() {
final InternalException e =
new InternalException("INTERNAL: Generic.", null, GrpcStatusCode.of(Code.INTERNAL), false);

assertThat(predicate.apply(e)).isFalse();
}

@Test
public void nullIsNotRetryable() {
assertThat(predicate.apply(null)).isFalse();
}

@Test
public void genericExceptionIsNotRetryable() {
assertThat(predicate.apply(new Exception())).isFalse();
}
}
@@ -0,0 +1,53 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;

import com.google.common.base.Predicate;
import javax.net.ssl.SSLHandshakeException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@SuppressWarnings("unchecked")
@RunWith(JUnit4.class)
public class IsSslHandshakeExceptionTest {

private Predicate<Throwable> predicate;

@Before
public void setUp() {
predicate = new IsSslHandshakeException();
}

@Test
public void sslHandshakeExceptionIsTrue() {
assertThat(predicate.apply(new SSLHandshakeException("test"))).isTrue();
}

@Test
public void genericExceptionIsNotSslHandshakeException() {
assertThat(predicate.apply(new Exception("test"))).isFalse();
}

@Test
public void nullIsNotSslHandshakeException() {
assertThat(predicate.apply(null)).isFalse();
}
}

0 comments on commit a53d736

Please sign in to comment.