diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index ba97a975c..02722fac6 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -26,4 +26,9 @@ com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long) 7013 + + com/google/cloud/storage/spi/v1/StorageRpc + long getCurrentUploadOffset(java.lang.String) + 7012 + diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index 0c9520849..2e9c0a380 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -51,6 +51,15 @@ class BlobWriteChannel extends BaseWriteChannel { // Contains metadata of the updated object or null if upload is not completed. private StorageObject storageObject; + // Detect if flushBuffer() is being retried or not. + // TODO: I don't think this is thread safe, and there's probably a better way to detect a retry + // occuring. + private boolean retrying = false; + + boolean isRetrying() { + return retrying; + } + StorageObject getStorageObject() { return storageObject; } @@ -63,11 +72,105 @@ protected void flushBuffer(final int length, final boolean last) { new Runnable() { @Override public void run() { - storageObject = - getOptions() - .getStorageRpcV1() - .writeWithResponse( - getUploadId(), getBuffer(), 0, getPosition(), length, last); + if (!isRetrying()) { + // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset() + retrying = true; + storageObject = + getOptions() + .getStorageRpcV1() + .writeWithResponse( + getUploadId(), getBuffer(), 0, getPosition(), length, last); + } else { + // Retriable interruption occurred. + // Variables: + // chunk = getBuffer() + // localNextByteOffset == getPosition() + // chunkSize = getChunkSize() + // + // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0: + // we are retrying from first chunk start from 0 offset. + // + // Case 2: localNextByteOffset == remoteNextByteOffset: + // Special case of Case 1 when a chunk is retried. + // + // Case 3: localNextByteOffset < remoteNextByteOffset + // && driftOffset < chunkSize: + // Upload progressed and localNextByteOffset is not in-sync with + // remoteNextByteOffset and driftOffset is less than chunkSize. + // driftOffset must be less than chunkSize for it to retry using + // chunk maintained in memory. + // Find the driftOffset by subtracting localNextByteOffset from + // remoteNextByteOffset. + // Use driftOffset to determine where to restart from using the chunk in + // memory. + // + // Case 4: localNextByteOffset < remoteNextByteOffset + // && driftOffset == chunkSize: + // Special case of Case 3. + // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on + // to the next chunk. + // + // Case 5: localNextByteOffset < remoteNextByteOffset + // && driftOffset > chunkSize: + // Throw exception as remoteNextByteOffset has drifted beyond the retriable + // chunk maintained in memory. This is not possible unless there's multiple + // clients uploading to the same resumable upload session. + // + // Case 6: localNextByteOffset > remoteNextByteOffset: + // For completeness, this case is not possible because it would require retrying + // a 400 status code which is not allowed. + // + // Get remote offset from API + long remoteNextByteOffset = + getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId()); + long localNextByteOffset = getPosition(); + int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset); + int retryChunkLength = length - driftOffset; + + if (localNextByteOffset == 0 && remoteNextByteOffset == 0 + || localNextByteOffset == remoteNextByteOffset) { + // Case 1 and 2 + storageObject = + getOptions() + .getStorageRpcV1() + .writeWithResponse( + getUploadId(), getBuffer(), 0, getPosition(), length, last); + } else if (localNextByteOffset < remoteNextByteOffset + && driftOffset < getChunkSize()) { + // Case 3 + storageObject = + getOptions() + .getStorageRpcV1() + .writeWithResponse( + getUploadId(), + getBuffer(), + driftOffset, + remoteNextByteOffset, + retryChunkLength, + last); + } else if (localNextByteOffset < remoteNextByteOffset + && driftOffset == getChunkSize()) { + // Case 4 + // Continue to next chunk + retrying = false; + return; + } else { + // Case 5 + StringBuilder sb = new StringBuilder(); + sb.append( + "Remote offset has progressed beyond starting byte offset of next chunk."); + sb.append( + "This may be a symptom of multiple clients uploading to the same upload session.\n\n"); + sb.append("For debugging purposes:\n"); + sb.append("uploadId: ").append(getUploadId()).append('\n'); + sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n'); + sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n'); + sb.append("driftOffset: ").append(driftOffset).append("\n\n"); + throw new StorageException(0, sb.toString()); + } + } + // Request was successful and retrying state is now disabled. + retrying = false; } }), getOptions().getRetrySettings(), diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 0960f91ff..faba82b1d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -747,6 +747,51 @@ public void write( writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last); } + @Override + public long getCurrentUploadOffset(String uploadId) { + try { + GenericUrl url = new GenericUrl(uploadId); + HttpRequest httpRequest = storage.getRequestFactory().buildPutRequest(url, null); + + httpRequest.getHeaders().setContentRange("bytes */*"); + // Turn off automatic redirects. + // HTTP 308 are returned if upload is incomplete. + // See: https://cloud.google.com/storage/docs/performing-resumable-uploads + httpRequest.setFollowRedirects(false); + + HttpResponse response = null; + try { + response = httpRequest.execute(); + int code = response.getStatusCode(); + String message = response.getStatusMessage(); + if (code == 201 || code == 200) { + throw new StorageException(0, "Resumable upload is already complete."); + } + StringBuilder sb = new StringBuilder(); + sb.append("Not sure what occurred. Here's debugging information:\n"); + sb.append("Response:\n").append(response.toString()).append("\n\n"); + throw new StorageException(0, sb.toString()); + } catch (HttpResponseException ex) { + int code = ex.getStatusCode(); + if (code == 308 && ex.getHeaders().getRange() == null) { + // No progress has been made. + return 0; + } else { + // API returns last byte received offset + String range = ex.getHeaders().getRange(); + // Return next byte offset by adding 1 to last byte received offset + return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1; + } + } finally { + if (response != null) { + response.disconnect(); + } + } + } catch (IOException ex) { + throw translate(ex); + } + } + @Override public StorageObject writeWithResponse( String uploadId, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java index 2088c15be..cc4dd5740 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java @@ -328,6 +328,15 @@ void write( int length, boolean last); + /** + * Requests current byte offset from Cloud Storage API. Used to recover from a failure in some + * bytes were committed successfully to the open resumable session. + * + * @param uploadId resumable upload ID URL + * @throws StorageException upon failure + */ + long getCurrentUploadOffset(String uploadId); + /** * Writes the provided bytes to a storage object at the provided location. If {@code last=true} * returns metadata of the updated object, otherwise returns null. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java index 7733f13eb..b4479682a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java @@ -139,6 +139,11 @@ public void write( throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public long getCurrentUploadOffset(String uploadId) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public StorageObject writeWithResponse( String uploadId, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index a18345be8..b6d7bbf3a 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -134,6 +134,74 @@ public void testWriteWithoutFlush() throws IOException { assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE))); } + @Test + public void testWriteWithFlushRetryChunk() throws IOException { + StorageException exception = new StorageException(new SocketException("Socket closed")); + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(exception); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andReturn(null); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + assertTrue(writer.isOpen()); + assertNull(writer.getStorageObject()); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + } + + @Test + public void testWriteWithFlushRetryChunkWithDrift() throws IOException { + StorageException exception = new StorageException(new SocketException("Socket closed")); + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(exception); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(10), + eq(10L), + eq(MIN_CHUNK_SIZE - 10), + eq(false))) + .andReturn(null); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + assertTrue(writer.isOpen()); + assertNull(writer.getStorageObject()); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + } + @Test public void testWriteWithFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);