New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: connection closed prematurely in BlobReadChannel & ConnectionReset #173
Changes from 5 commits
3803ac9
0baa6aa
9c01f15
c5938b9
1246212
42d95a4
fb31431
ffd7b1f
5663daf
e918305
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,7 @@ | |
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.math.BigInteger; | ||
import java.net.SocketException; | ||
import java.util.ArrayList; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
|
@@ -704,6 +705,10 @@ public Tuple<String, byte[]> read( | |
StorageException serviceException = translate(ex); | ||
if (serviceException.getCode() == SC_REQUESTED_RANGE_NOT_SATISFIABLE) { | ||
return Tuple.of(null, new byte[0]); | ||
} else if (serviceException.getMessage().contains("Connection closed prematurely")) { | ||
serviceException = new StorageException(new SocketException(serviceException.getMessage())); | ||
} else if (serviceException.getMessage().contains("Connection reset")) { | ||
serviceException = new StorageException(new SocketException(serviceException.getMessage())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This loses the stack trace; you should include the servieException too |
||
} | ||
throw serviceException; | ||
} finally { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,17 +28,18 @@ | |
|
||
import com.google.cloud.ReadChannel; | ||
import com.google.cloud.RestorableState; | ||
import com.google.cloud.ServiceOptions; | ||
import com.google.cloud.Tuple; | ||
import com.google.cloud.storage.spi.StorageRpcFactory; | ||
import com.google.cloud.storage.spi.v1.StorageRpc; | ||
import com.google.common.collect.ImmutableMap; | ||
import java.io.IOException; | ||
import java.net.SocketException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
import java.util.Random; | ||
import javax.net.ssl.SSLException; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
@@ -68,7 +69,6 @@ public void setUp() { | |
StorageOptions.newBuilder() | ||
.setProjectId("projectId") | ||
.setServiceRpcFactory(rpcFactoryMock) | ||
.setRetrySettings(ServiceOptions.getNoRetrySettings()) | ||
.build(); | ||
} | ||
|
||
|
@@ -84,6 +84,85 @@ public void testCreate() { | |
assertTrue(reader.isOpen()); | ||
} | ||
|
||
@Test | ||
public void testCreateRetryableErrorPrematureClosure() throws IOException { | ||
byte[] arr = {0x0, 0xd, 0xa}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. array (no abbreviations per Google style) |
||
Tuple<String, byte[]> test = Tuple.of("etag", arr); | ||
StorageException exception = | ||
new StorageException( | ||
new SocketException( | ||
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760")); | ||
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); | ||
ByteBuffer byteBuffer = ByteBuffer.allocate(400); | ||
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andThrow(exception); | ||
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andReturn(test); | ||
replay(storageRpcMock); | ||
assertTrue(reader.read(byteBuffer) > 0); | ||
} | ||
|
||
@Test | ||
public void testCreateNonRetryableErrorPrematureClosure() throws IOException { | ||
byte[] arr = {0x0, 0xd, 0xa}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. array |
||
Tuple<String, byte[]> test = Tuple.of("etag", arr); | ||
StorageException exception = | ||
new StorageException( | ||
new IOException( | ||
"Connection closed prematurely: bytesRead = 1114112, Content-Length = 10485760")); | ||
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); | ||
ByteBuffer byteBuffer = ByteBuffer.allocate(400); | ||
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, 2097152)).andThrow(exception); | ||
replay(storageRpcMock); | ||
try { | ||
assertTrue(reader.read(byteBuffer) > 0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't both assertTrue and fail |
||
fail("StorageException was expected"); | ||
} catch (StorageException e) { | ||
// Throw expected. | ||
} | ||
} | ||
|
||
@Test | ||
public void testCreateRetryableErrorConnectionReset() throws IOException { | ||
byte[] arr = {0x0, 0xd, 0xa}; | ||
Tuple<String, byte[]> test = Tuple.of("etag", arr); | ||
StorageException exception = | ||
new StorageException( | ||
new SocketException( | ||
new IOException( | ||
"Connection has been shutdown: " | ||
+ new SSLException(new SocketException("Connection reset"))) | ||
.toString())); | ||
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); | ||
ByteBuffer byteBuffer = ByteBuffer.allocate(400); | ||
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) | ||
.andThrow(exception); | ||
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) | ||
.andReturn(test); | ||
replay(storageRpcMock); | ||
assertTrue(reader.read(byteBuffer) > 0); | ||
} | ||
|
||
@Test | ||
public void testCreateNonRetryableErrorConnectionReset() throws IOException { | ||
byte[] arr = {0x0, 0xd, 0xa}; | ||
Tuple<String, byte[]> test = Tuple.of("etag", arr); | ||
StorageException exception = | ||
new StorageException( | ||
new IOException( | ||
"Connection has been shutdown: " | ||
+ new SSLException(new SocketException("Connection reset")))); | ||
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); | ||
ByteBuffer byteBuffer = ByteBuffer.allocate(400); | ||
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) | ||
.andThrow(exception); | ||
replay(storageRpcMock); | ||
try { | ||
assertTrue(reader.read(byteBuffer) > 0); | ||
fail("StorageException was expected"); | ||
} catch (StorageException e) { | ||
// Expected to throw | ||
} | ||
} | ||
|
||
@Test | ||
public void testReadBuffered() throws IOException { | ||
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very weird. I don't think I've ever seen this double wrapping in one throw clause done before. Can you explain the purpose here and how it works?