Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: connection closed prematurely in BlobReadChannel & ConnectionReset #173

Merged
merged 10 commits into from Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -77,6 +77,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -704,6 +705,10 @@ public Tuple<String, byte[]> read(
StorageException serviceException = translate(ex);
if (serviceException.getCode() == SC_REQUESTED_RANGE_NOT_SATISFIABLE) {
return Tuple.of(null, new byte[0]);
} else if (serviceException.getMessage().contains("Connection closed prematurely")) {
serviceException = new StorageException(new SocketException(serviceException.getMessage()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very weird. I don't think I've ever seen this double wrapping in one throw clause done before. Can you explain the purpose here and how it works?

} else if (serviceException.getMessage().contains("Connection reset")) {
serviceException = new StorageException(new SocketException(serviceException.getMessage()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loses the stack trace; you should include the servieException too

}
throw serviceException;
} finally {
Expand Down
Expand Up @@ -28,17 +28,18 @@

import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import javax.net.ssl.SSLException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -68,7 +69,6 @@ public void setUp() {
StorageOptions.newBuilder()
.setProjectId("projectId")
.setServiceRpcFactory(rpcFactoryMock)
.setRetrySettings(ServiceOptions.getNoRetrySettings())
.build();
}

Expand All @@ -84,6 +84,85 @@ public void testCreate() {
assertTrue(reader.isOpen());
}

@Test
public void testCreateRetryableErrorPrematureClosure() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array (no abbreviations per Google style)

Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new SocketException(
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760"));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andThrow(exception);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andReturn(test);
replay(storageRpcMock);
assertTrue(reader.read(byteBuffer) > 0);
}

@Test
public void testCreateNonRetryableErrorPrematureClosure() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array

Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new IOException(
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760"));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andThrow(exception);
replay(storageRpcMock);
try {
assertTrue(reader.read(byteBuffer) > 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't both assertTrue and fail

fail("StorageException was expected");
} catch (StorageException e) {
// Throw expected.
}
}

@Test
public void testCreateRetryableErrorConnectionReset() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new SocketException(
new IOException(
"Connection has been shutdown: "
+ new SSLException(new SocketException("Connection reset")))
.toString()));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andThrow(exception);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(test);
replay(storageRpcMock);
assertTrue(reader.read(byteBuffer) > 0);
}

@Test
public void testCreateNonRetryableErrorConnectionReset() throws IOException {
byte[] arr = {0x0, 0xd, 0xa};
Tuple<String, byte[]> test = Tuple.of("etag", arr);
StorageException exception =
new StorageException(
new IOException(
"Connection has been shutdown: "
+ new SSLException(new SocketException("Connection reset"))));
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
ByteBuffer byteBuffer = ByteBuffer.allocate(400);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andThrow(exception);
replay(storageRpcMock);
try {
assertTrue(reader.read(byteBuffer) > 0);
fail("StorageException was expected");
} catch (StorageException e) {
// Expected to throw
}
}

@Test
public void testReadBuffered() throws IOException {
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
Expand Down