Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry using remote offset #604

Merged
merged 3 commits into from Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Expand Up @@ -26,4 +26,9 @@
<method>com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long)</method>
<differenceType>7013</differenceType>
</difference>
<difference>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>long getCurrentUploadOffset(java.lang.String)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
Expand Up @@ -51,6 +51,15 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;

// Detect if flushBuffer() is being retried or not.
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;

boolean isRetrying() {
return retrying;
}

StorageObject getStorageObject() {
return storageObject;
}
Expand All @@ -63,11 +72,105 @@ protected void flushBuffer(final int length, final boolean last) {
new Runnable() {
@Override
public void run() {
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
if (!isRetrying()) {
// Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
retrying = true;
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else {
// Retriable interruption occurred.
// Variables:
// chunk = getBuffer()
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
// we are retrying from first chunk start from 0 offset.
//
// Case 2: localNextByteOffset == remoteNextByteOffset:
// Special case of Case 1 when a chunk is retried.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
// driftOffset must be less than chunkSize for it to retry using
// chunk maintained in memory.
// Find the driftOffset by subtracting localNextByteOffset from
// remoteNextByteOffset.
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 3.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 5: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 6: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
long localNextByteOffset = getPosition();
int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
int retryChunkLength = length - driftOffset;

if (localNextByteOffset == 0 && remoteNextByteOffset == 0
|| localNextByteOffset == remoteNextByteOffset) {
// Case 1 and 2
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset < getChunkSize()) {
// Case 3
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(),
getBuffer(),
driftOffset,
remoteNextByteOffset,
retryChunkLength,
last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset == getChunkSize()) {
// Case 4
// Continue to next chunk
retrying = false;
return;
} else {
// Case 5
StringBuilder sb = new StringBuilder();
sb.append(
"Remote offset has progressed beyond starting byte offset of next chunk.");
sb.append(
"This may be a symptom of multiple clients uploading to the same upload session.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
}
}
// Request was successful and retrying state is now disabled.
retrying = false;
}
}),
getOptions().getRetrySettings(),
Expand Down
Expand Up @@ -747,6 +747,51 @@ public void write(
writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last);
}

@Override
public long getCurrentUploadOffset(String uploadId) {
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest = storage.getRequestFactory().buildPutRequest(url, null);

httpRequest.getHeaders().setContentRange("bytes */*");
// Turn off automatic redirects.
// HTTP 308 are returned if upload is incomplete.
// See: https://cloud.google.com/storage/docs/performing-resumable-uploads
httpRequest.setFollowRedirects(false);

HttpResponse response = null;
try {
response = httpRequest.execute();
int code = response.getStatusCode();
String message = response.getStatusMessage();
if (code == 201 || code == 200) {
throw new StorageException(0, "Resumable upload is already complete.");
}
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
sb.append("Response:\n").append(response.toString()).append("\n\n");
throw new StorageException(0, sb.toString());
} catch (HttpResponseException ex) {
int code = ex.getStatusCode();
if (code == 308 && ex.getHeaders().getRange() == null) {
// No progress has been made.
return 0;
} else {
// API returns last byte received offset
String range = ex.getHeaders().getRange();
// Return next byte offset by adding 1 to last byte received offset
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
}
} finally {
if (response != null) {
response.disconnect();
}
}
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Expand Up @@ -328,6 +328,15 @@ void write(
int length,
boolean last);

/**
* Requests current byte offset from Cloud Storage API. Used to recover from a failure in some
* bytes were committed successfully to the open resumable session.
*
* @param uploadId resumable upload ID URL
* @throws StorageException upon failure
*/
long getCurrentUploadOffset(String uploadId);

/**
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
* returns metadata of the updated object, otherwise returns null.
Expand Down
Expand Up @@ -139,6 +139,11 @@ public void write(
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public long getCurrentUploadOffset(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Expand Up @@ -134,6 +134,74 @@ public void testWriteWithoutFlush() throws IOException {
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}

@Test
public void testWriteWithFlushRetryChunk() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
.andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
assertNull(writer.getStorageObject());
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
.andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(10),
eq(10L),
eq(MIN_CHUNK_SIZE - 10),
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
assertNull(writer.getStorageObject());
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithFlush() throws IOException {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
Expand Down