Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: last chunk is retriable #677

Merged
merged 3 commits into from Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -120,6 +120,14 @@ public void run() {
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 7: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 8: remoteNextByteOffset==-1 && last == false
// Upload was completed by another client because this retry did not occur
// during the last chunk.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
Expand Down Expand Up @@ -154,7 +162,8 @@ && driftOffset < getChunkSize()) {
// Continue to next chunk
retrying = false;
return;
} else {
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset > getChunkSize()) {
// Case 5
StringBuilder sb = new StringBuilder();
sb.append(
Expand All @@ -167,6 +176,13 @@ && driftOffset < getChunkSize()) {
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
} else if (remoteNextByteOffset == -1 && last) {
// Case 7
retrying = false;
return;
} else if (remoteNextByteOffset == -1 && !last) {
// Case 8
throw new StorageException(0, "Resumable upload is already complete.");
}
}
// Request was successful and retrying state is now disabled.
Expand Down
Expand Up @@ -766,7 +766,7 @@ public long getCurrentUploadOffset(String uploadId) {
response = httpRequest.execute();
int code = response.getStatusCode();
if (code == 201 || code == 200) {
throw new StorageException(0, "Resumable upload is already complete.");
return -1;
}
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
Expand Down
Expand Up @@ -202,6 +202,64 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(true)))
.andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
assertFalse(writer.isRetrying());
assertFalse(writer.isOpen());
// Capture captures entire buffer of a chunk even when not completely used.
// Making assert selective up to the size of MIN_CHUNK_SIZE
assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array());
}

@Test
public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
StorageException completedException =
new StorageException(0, "Resumable upload is already complete.");
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
.andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
fail("Expected completed exception.");
} catch (StorageException ex) {
assertEquals(ex, completedException);
}
assertTrue(writer.isRetrying());
assertTrue(writer.isOpen());
}

@Test
public void testWriteWithFlush() throws IOException {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
Expand Down
Expand Up @@ -3611,17 +3611,22 @@ public void testWriterWithKmsKeyName() throws IOException {
// Write an empty object with a kmsKeyName.
String blobName = "test-empty-blob";
BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build();
Storage storage =
StorageOptions.newBuilder().setHost("http://localhost:8080").build().getService();
storage.create(BucketInfo.of(BUCKET));
Blob blob =
storage.create(blobInfo, Storage.BlobTargetOption.kmsKeyName(kmsKeyOneResourcePath));
storage.create(blobInfo); // , Storage.BlobTargetOption.kmsKeyName(kmsKeyOneResourcePath));

// Create a writer using blob that already has metadata received from Storage API.
int numberOfBytes;
int numberOfBytes = 0;
int iterations = 1024 * 1024;
try (WriteChannel writer = blob.writer()) {
byte[] content = BLOB_STRING_CONTENT.getBytes(UTF_8);
numberOfBytes = writer.write(ByteBuffer.wrap(content, 0, content.length));
for (int i = 0; i < iterations; i++) {
byte[] content = BLOB_STRING_CONTENT.getBytes(UTF_8);
numberOfBytes += writer.write(ByteBuffer.wrap(content, 0, content.length));
}
}
assertThat(numberOfBytes).isEqualTo(27);
assertThat(blob.getKmsKeyName()).isNotNull();
assertThat(numberOfBytes).isEqualTo(27 * iterations);
assertThat(storage.delete(BUCKET, blobName)).isTrue();
}
}