Skip to content

Commit

Permalink
change design of transient retry strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
frankyn committed Mar 12, 2020
1 parent 1246212 commit 42d95a4
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 88 deletions.
Expand Up @@ -33,6 +33,9 @@
*/
@InternalApi
public final class StorageException extends BaseHttpServiceException {
private static final String INTERNAL_ERROR = "internalError";
private static final String CONNECTION_CLOSED_PREMATURELY = "connectionClosedPrematurely";
private static final String CONNECTION_RESET = "connectionReset";

// see: https://cloud.google.com/storage/docs/resumable-uploads-xml#practices
private static final Set<Error> RETRYABLE_ERRORS =
Expand All @@ -43,7 +46,9 @@ public final class StorageException extends BaseHttpServiceException {
new Error(500, null),
new Error(429, null),
new Error(408, null),
new Error(null, "internalError"));
new Error(null, INTERNAL_ERROR),
new Error(null, CONNECTION_CLOSED_PREMATURELY),
new Error(null, CONNECTION_RESET));

private static final long serialVersionUID = -4168430271327813063L;

Expand All @@ -55,6 +60,10 @@ public StorageException(int code, String message, Throwable cause) {
super(code, message, null, true, RETRYABLE_ERRORS, cause);
}

public StorageException(int code, String message, String reason, Throwable cause) {
super(code, message, reason, true, RETRYABLE_ERRORS, cause);
}

public StorageException(IOException exception) {
super(exception, true, RETRYABLE_ERRORS);
}
Expand All @@ -73,4 +82,16 @@ public static StorageException translateAndThrow(RetryHelperException ex) {
BaseServiceException.translate(ex);
throw new StorageException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
}

public static StorageException translate(IOException exception) {
if (exception.getMessage().contains("Connection closed prematurely")) {
return new StorageException(
0, exception.getMessage(), CONNECTION_CLOSED_PREMATURELY, exception);
} else if (exception.getMessage().contains("Connection reset")) {
return new StorageException(0, exception.getMessage(), CONNECTION_RESET, exception);
} else {
// default
return new StorageException(exception);
}
}
}
Expand Up @@ -77,7 +77,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -227,6 +226,7 @@ public void onFailure(GoogleJsonError googleJsonError, HttpHeaders httpHeaders)
}

private static StorageException translate(IOException exception) {

return new StorageException(exception);
}

Expand Down Expand Up @@ -702,13 +702,9 @@ public Tuple<String, byte[]> read(
return Tuple.of(etag, output.toByteArray());
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = translate(ex);
StorageException serviceException = StorageException.translate(ex);
if (serviceException.getCode() == SC_REQUESTED_RANGE_NOT_SATISFIABLE) {
return Tuple.of(null, new byte[0]);
} else if (serviceException.getMessage().contains("Connection closed prematurely")) {
serviceException = new StorageException(new SocketException(serviceException.getMessage()));
} else if (serviceException.getMessage().contains("Connection reset")) {
serviceException = new StorageException(new SocketException(serviceException.getMessage()));
}
throw serviceException;
} finally {
Expand Down
Expand Up @@ -33,13 +33,11 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import javax.net.ssl.SSLException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -84,85 +82,6 @@ public void testCreate() {
assertTrue(reader.isOpen());
}

@Test
public void testCreateRetryableErrorPrematureClosure() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new SocketException(
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760"));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andThrow(exception);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andReturn(test);
replay(storageRpcMock);
assertTrue(reader.read(byteBuffer) > 0);
}

@Test
public void testCreateNonRetryableErrorPrematureClosure() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new IOException(
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760"));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andThrow(exception);
replay(storageRpcMock);
try {
assertTrue(reader.read(byteBuffer) > 0);
fail("StorageException was expected");
} catch (StorageException e) {
// Throw expected.
}
}

@Test
public void testCreateRetryableErrorConnectionReset() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new SocketException(
new IOException(
"Connection has been shutdown: "
+ new SSLException(new SocketException("Connection reset")))
.toString()));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andThrow(exception);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(test);
replay(storageRpcMock);
assertTrue(reader.read(byteBuffer) > 0);
}

@Test
public void testCreateNonRetryableErrorConnectionReset() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new IOException(
"Connection has been shutdown: "
+ new SSLException(new SocketException("Connection reset"))));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andThrow(exception);
replay(storageRpcMock);
try {
assertTrue(reader.read(byteBuffer) > 0);
fail("StorageException was expected");
} catch (StorageException e) {
// Expected to throw
}
}

@Test
public void testReadBuffered() throws IOException {
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
Expand Down
Expand Up @@ -32,7 +32,9 @@
import com.google.cloud.BaseServiceException;
import com.google.cloud.RetryHelper.RetryHelperException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import javax.net.ssl.SSLException;
import org.junit.Test;

public class StorageExceptionTest {
Expand Down Expand Up @@ -135,6 +137,29 @@ public void testStorageException() {
assertTrue(exception.isRetryable());
}

@Test
public void testTranslateConnectionReset() {
StorageException exception =
StorageException.translate(
new IOException(
"Connection has been shutdown: "
+ new SSLException(new SocketException("Connection reset"))));
assertEquals(0, exception.getCode());
assertEquals("connectionReset", exception.getReason());
assertTrue(exception.isRetryable());
}

@Test
public void testTranslateConnectionClosedPrematurely() {
StorageException exception =
StorageException.translate(
new IOException(
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760"));
assertEquals(0, exception.getCode());
assertEquals("connectionClosedPrematurely", exception.getReason());
assertTrue(exception.isRetryable());
}

@Test
public void testTranslateAndThrow() throws Exception {
Exception cause = new StorageException(503, "message");
Expand Down

0 comments on commit 42d95a4

Please sign in to comment.