From a53d7369bb2a8640ab42e409632b352decbdbf5e Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Mon, 27 Jul 2020 09:04:40 +1000 Subject: [PATCH] fix: retry pdml transaction on EOS internal error (#360) * fix: retries PDML transactions on EOS errors It is possible to have the stream closed with an EOS internal error. This should be retried by the client. --- .../spanner/IsRetryableInternalError.java | 53 +++++++ .../spanner/IsSslHandshakeException.java | 28 ++++ .../spanner/PartitionedDmlTransaction.java | 14 +- .../spanner/SpannerExceptionFactory.java | 36 +---- .../spanner/IsRetryableInternalErrorTest.java | 135 ++++++++++++++++++ .../spanner/IsSslHandshakeExceptionTest.java | 53 +++++++ .../PartitionedDmlTransactionTest.java | 81 +++++++++-- 7 files changed, 357 insertions(+), 43 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsSslHandshakeException.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsSslHandshakeExceptionTest.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java new file mode 100644 index 0000000000..cb048cb673 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.rpc.InternalException; +import com.google.common.base.Predicate; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; + +public class IsRetryableInternalError implements Predicate { + + private static final String HTTP2_ERROR_MESSAGE = "HTTP/2 error code: INTERNAL_ERROR"; + private static final String CONNECTION_CLOSED_ERROR_MESSAGE = + "Connection closed with unknown cause"; + private static final String EOS_ERROR_MESSAGE = + "Received unexpected EOS on DATA frame from server"; + + @Override + public boolean apply(Throwable cause) { + if (isInternalError(cause)) { + if (cause.getMessage().contains(HTTP2_ERROR_MESSAGE)) { + // See b/25451313. + return true; + } else if (cause.getMessage().contains(CONNECTION_CLOSED_ERROR_MESSAGE)) { + // See b/27794742. + return true; + } else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) { + return true; + } + } + return false; + } + + private boolean isInternalError(Throwable cause) { + return (cause instanceof InternalException) + || (cause instanceof StatusRuntimeException + && ((StatusRuntimeException) cause).getStatus().getCode() == Status.Code.INTERNAL); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsSslHandshakeException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsSslHandshakeException.java new file mode 100644 index 0000000000..53ff151a83 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsSslHandshakeException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.common.base.Predicate; +import javax.net.ssl.SSLHandshakeException; + +public class IsSslHandshakeException implements Predicate { + + @Override + public boolean apply(Throwable input) { + return input instanceof SSLHandshakeException; + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 4d2b01146e..da92b6f9c6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google LLC + * Copyright 2020 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -43,17 +44,20 @@ import org.threeten.bp.temporal.ChronoUnit; public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction { + private static final Logger LOGGER = Logger.getLogger(PartitionedDmlTransaction.class.getName()); private final SessionImpl session; private final SpannerRpc rpc; private final Ticker ticker; + private final IsRetryableInternalError isRetryableInternalErrorPredicate; private volatile boolean isValid = true; PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) { this.session = session; this.rpc = rpc; this.ticker = ticker; + this.isRetryableInternalErrorPredicate = new IsRetryableInternalError(); } /** @@ -95,6 +99,14 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration LOGGER.log( Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e); request = resumeOrRestartRequest(resumeToken, statement, request); + } catch (InternalException e) { + if (!isRetryableInternalErrorPredicate.apply(e)) { + throw e; + } + + LOGGER.log( + Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e); + request = resumeOrRestartRequest(resumeToken, statement, request); } catch (AbortedException e) { LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e); resumeToken = ByteString.EMPTY; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java index 78cd23d064..3fa756875b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java @@ -26,12 +26,10 @@ import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.protobuf.ProtoUtils; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; -import javax.net.ssl.SSLHandshakeException; /** * A factory for creating instances of {@link SpannerException} and its subtypes. All creation of @@ -40,6 +38,7 @@ * ErrorCode#ABORTED} are always represented by {@link AbortedException}. */ public final class SpannerExceptionFactory { + static final String SESSION_RESOURCE_TYPE = "type.googleapis.com/google.spanner.v1.Session"; static final String DATABASE_RESOURCE_TYPE = "type.googleapis.com/google.spanner.admin.database.v1.Database"; @@ -257,35 +256,8 @@ private static boolean hasCauseMatching( } private static class Matchers { - static final Predicate isRetryableInternalError = - new Predicate() { - @Override - public boolean apply(Throwable cause) { - if (cause instanceof StatusRuntimeException - && ((StatusRuntimeException) cause).getStatus().getCode() == Status.Code.INTERNAL) { - if (cause.getMessage().contains("HTTP/2 error code: INTERNAL_ERROR")) { - // See b/25451313. - return true; - } - if (cause.getMessage().contains("Connection closed with unknown cause")) { - // See b/27794742. - return true; - } - if (cause - .getMessage() - .contains("Received unexpected EOS on DATA frame from server")) { - return true; - } - } - return false; - } - }; - static final Predicate isSSLHandshakeException = - new Predicate() { - @Override - public boolean apply(Throwable input) { - return input instanceof SSLHandshakeException; - } - }; + + static final Predicate isRetryableInternalError = new IsRetryableInternalError(); + static final Predicate isSSLHandshakeException = new IsSslHandshakeException(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java new file mode 100644 index 0000000000..0f76da6692 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java @@ -0,0 +1,135 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.InternalException; +import com.google.common.base.Predicate; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@SuppressWarnings("unchecked") +@RunWith(JUnit4.class) +public class IsRetryableInternalErrorTest { + + private Predicate predicate; + + @Before + public void setUp() { + predicate = new IsRetryableInternalError(); + } + + @Test + public void http2ErrorStatusRuntimeExceptionIsRetryable() { + final StatusRuntimeException e = + new StatusRuntimeException( + Status.fromCode(Code.INTERNAL) + .withDescription("INTERNAL: HTTP/2 error code: INTERNAL_ERROR.")); + + assertThat(predicate.apply(e)).isTrue(); + } + + @Test + public void http2ErrorInternalExceptionIsRetryable() { + final InternalException e = + new InternalException( + "INTERNAL: HTTP/2 error code: INTERNAL_ERROR.", + null, + GrpcStatusCode.of(Code.INTERNAL), + false); + + assertThat(predicate.apply(e)).isTrue(); + } + + @Test + public void connectionClosedStatusRuntimeExceptionIsRetryable() { + final StatusRuntimeException e = + new StatusRuntimeException( + Status.fromCode(Code.INTERNAL) + .withDescription("INTERNAL: Connection closed with unknown cause.")); + + assertThat(predicate.apply(e)).isTrue(); + } + + @Test + public void connectionClosedInternalExceptionIsRetryable() { + final InternalException e = + new InternalException( + "INTERNAL: Connection closed with unknown cause.", + null, + GrpcStatusCode.of(Code.INTERNAL), + false); + + assertThat(predicate.apply(e)).isTrue(); + } + + @Test + public void eosStatusRuntimeExceptionIsRetryable() { + final StatusRuntimeException e = + new StatusRuntimeException( + Status.fromCode(Code.INTERNAL) + .withDescription("INTERNAL: Received unexpected EOS on DATA frame from server.")); + + assertThat(predicate.apply(e)).isTrue(); + } + + @Test + public void eosInternalExceptionIsRetryable() { + final InternalException e = + new InternalException( + "INTERNAL: Received unexpected EOS on DATA frame from server.", + null, + GrpcStatusCode.of(Code.INTERNAL), + false); + + assertThat(predicate.apply(e)).isTrue(); + } + + @Test + public void genericInternalStatusRuntimeExceptionIsRetryable() { + final StatusRuntimeException e = + new StatusRuntimeException( + Status.fromCode(Code.INTERNAL).withDescription("INTERNAL: Generic.")); + + assertThat(predicate.apply(e)).isFalse(); + } + + @Test + public void genericInternalExceptionIsNotRetryable() { + final InternalException e = + new InternalException("INTERNAL: Generic.", null, GrpcStatusCode.of(Code.INTERNAL), false); + + assertThat(predicate.apply(e)).isFalse(); + } + + @Test + public void nullIsNotRetryable() { + assertThat(predicate.apply(null)).isFalse(); + } + + @Test + public void genericExceptionIsNotRetryable() { + assertThat(predicate.apply(new Exception())).isFalse(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsSslHandshakeExceptionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsSslHandshakeExceptionTest.java new file mode 100644 index 0000000000..b73c2d6ef1 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsSslHandshakeExceptionTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.base.Predicate; +import javax.net.ssl.SSLHandshakeException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@SuppressWarnings("unchecked") +@RunWith(JUnit4.class) +public class IsSslHandshakeExceptionTest { + + private Predicate predicate; + + @Before + public void setUp() { + predicate = new IsSslHandshakeException(); + } + + @Test + public void sslHandshakeExceptionIsTrue() { + assertThat(predicate.apply(new SSLHandshakeException("test"))).isTrue(); + } + + @Test + public void genericExceptionIsNotSslHandshakeException() { + assertThat(predicate.apply(new Exception("test"))).isFalse(); + } + + @Test + public void nullIsNotSslHandshakeException() { + assertThat(predicate.apply(null)).isFalse(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 5315114aac..a38cd71baa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -20,26 +20,19 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyMap; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.AbortedException; +import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; -import com.google.spanner.v1.BeginTransactionRequest; -import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.*; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; -import com.google.spanner.v1.PartialResultSet; -import com.google.spanner.v1.ResultSetStats; -import com.google.spanner.v1.Transaction; -import com.google.spanner.v1.TransactionSelector; import io.grpc.Status.Code; import java.util.Collections; import java.util.Iterator; @@ -274,4 +267,72 @@ public Long answer(InvocationOnMock invocation) throws Throwable { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); } } + + @Test + public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { + ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build(); + PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build(); + PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build(); + ServerStream stream1 = mock(ServerStream.class); + Iterator iterator = mock(Iterator.class); + when(iterator.hasNext()).thenReturn(true, true, false); + when(iterator.next()) + .thenReturn(p1) + .thenThrow( + new InternalException( + "INTERNAL: Received unexpected EOS on DATA frame from server.", + null, + GrpcStatusCode.of(Code.INTERNAL), + true)); + when(stream1.iterator()).thenReturn(iterator); + ServerStream stream2 = mock(ServerStream.class); + when(stream2.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator()); + when(rpc.executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) + .thenReturn(stream1); + when(rpc.executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) + .thenReturn(stream2); + + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); + + assertThat(count).isEqualTo(1000L); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc) + .executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); + verify(rpc) + .executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)); + } + + @Test + public void testExecuteStreamingPartitionedUpdateGenericInternalException() { + PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build(); + ServerStream stream1 = mock(ServerStream.class); + Iterator iterator = mock(Iterator.class); + when(iterator.hasNext()).thenReturn(true, true, false); + when(iterator.next()) + .thenReturn(p1) + .thenThrow( + new InternalException( + "INTERNAL: Error", null, GrpcStatusCode.of(Code.INTERNAL), false)); + when(stream1.iterator()).thenReturn(iterator); + when(rpc.executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) + .thenReturn(stream1); + + try { + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); + fail("missing expected INTERNAL exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc) + .executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); + } + } }