From 44f49e0a33c3e541d9f8b22622ffff17cc8b8eaa Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 12 Jan 2021 12:52:05 -0800 Subject: [PATCH] fix: last chunk is retriable (#677) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Ensure the tests and linter pass - [x] Appropriate docs were updated (if necessary) Fixes #666 ☕️ --- .../cloud/storage/BlobWriteChannel.java | 18 +++++- .../cloud/storage/spi/v1/HttpStorageRpc.java | 2 +- .../cloud/storage/BlobWriteChannelTest.java | 58 +++++++++++++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index 2e9c0a380..ca87933c0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -120,6 +120,14 @@ public void run() { // For completeness, this case is not possible because it would require retrying // a 400 status code which is not allowed. // + // Case 7: remoteNextByteOffset==-1 && last == true + // Upload is complete and retry occurred in the "last" chunk. Data sent was + // received by the service. + // + // Case 8: remoteNextByteOffset==-1 && last == false + // Upload was completed by another client because this retry did not occur + // during the last chunk. + // // Get remote offset from API long remoteNextByteOffset = getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId()); @@ -154,7 +162,8 @@ && driftOffset < getChunkSize()) { // Continue to next chunk retrying = false; return; - } else { + } else if (localNextByteOffset < remoteNextByteOffset + && driftOffset > getChunkSize()) { // Case 5 StringBuilder sb = new StringBuilder(); sb.append( @@ -167,6 +176,13 @@ && driftOffset < getChunkSize()) { sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n'); sb.append("driftOffset: ").append(driftOffset).append("\n\n"); throw new StorageException(0, sb.toString()); + } else if (remoteNextByteOffset == -1 && last) { + // Case 7 + retrying = false; + return; + } else if (remoteNextByteOffset == -1 && !last) { + // Case 8 + throw new StorageException(0, "Resumable upload is already complete."); } } // Request was successful and retrying state is now disabled. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index abdfa33ca..37aff2739 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -766,7 +766,7 @@ public long getCurrentUploadOffset(String uploadId) { response = httpRequest.execute(); int code = response.getStatusCode(); if (code == 201 || code == 200) { - throw new StorageException(0, "Resumable upload is already complete."); + return -1; } StringBuilder sb = new StringBuilder(); sb.append("Not sure what occurred. Here's debugging information:\n"); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index b6d7bbf3a..eefee7729 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -202,6 +202,64 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException { assertArrayEquals(buffer.array(), capturedBuffer.getValue()); } + @Test + public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException { + StorageException exception = new StorageException(new SocketException("Socket closed")); + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(true))) + .andThrow(exception); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + writer.close(); + assertFalse(writer.isRetrying()); + assertFalse(writer.isOpen()); + // Capture captures entire buffer of a chunk even when not completely used. + // Making assert selective up to the size of MIN_CHUNK_SIZE + assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array()); + } + + @Test + public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOException { + StorageException exception = new StorageException(new SocketException("Socket closed")); + StorageException completedException = + new StorageException(0, "Resumable upload is already complete."); + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(exception); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + try { + writer.write(buffer); + fail("Expected completed exception."); + } catch (StorageException ex) { + assertEquals(ex, completedException); + } + assertTrue(writer.isRetrying()); + assertTrue(writer.isOpen()); + } + @Test public void testWriteWithFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);