Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retrying get remote offset and recover from last chunk failures. #726

Merged
merged 7 commits into from Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;
private boolean checkingForLastChunk = false;

boolean isRetrying() {
return retrying;
Expand Down Expand Up @@ -87,13 +88,10 @@ public void run() {
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
// we are retrying from first chunk start from 0 offset.
// Case 1: localNextByteOffset == remoteNextByteOffset:
// Retrying the entire chunk
//
// Case 2: localNextByteOffset == remoteNextByteOffset:
// Special case of Case 1 when a chunk is retried.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// Case 2: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
Expand All @@ -104,48 +102,54 @@ public void run() {
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 3.
// Special case of Case 2.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 5: localNextByteOffset < remoteNextByteOffset
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 6: localNextByteOffset > remoteNextByteOffset:
// Case 5: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 7: remoteNextByteOffset==-1 && last == true
// Case 6: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 8: remoteNextByteOffset==-1 && last == false
// Upload was completed by another client because this retry did not occur
// during the last chunk.
// Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk
// Not last chunk and are not checkingForLastChunk, allow for the client to
// catch up to final chunk which meets
// Case 6.
//
// Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk
// Not last chunk and checkingForLastChunk means this is the second time we
// hit this case, meaning the upload was completed by a different client.
//
// Case 9: Only possible if the client local offset continues beyond the remote
// offset which is not possible.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
long localNextByteOffset = getPosition();
int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
int retryChunkLength = length - driftOffset;

if (localNextByteOffset == 0 && remoteNextByteOffset == 0
|| localNextByteOffset == remoteNextByteOffset) {
// Case 1 and 2
if (localNextByteOffset == remoteNextByteOffset) {
// Case 1
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset < getChunkSize()) {
// Case 3
// Case 2
storageObject =
getOptions()
.getStorageRpcV1()
Expand All @@ -158,13 +162,13 @@ && driftOffset < getChunkSize()) {
last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset == getChunkSize()) {
// Case 4
// Case 3
// Continue to next chunk
retrying = false;
return;
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset > getChunkSize()) {
// Case 5
// Case 4
StringBuilder sb = new StringBuilder();
sb.append(
"Remote offset has progressed beyond starting byte offset of next chunk.");
Expand All @@ -177,12 +181,34 @@ && driftOffset > getChunkSize()) {
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
} else if (remoteNextByteOffset == -1 && last) {
// Case 7
// Case 6
if (storageObject == null) {
// Request object metadata
storageObject =
getOptions().getStorageRpcV1().get(getEntity().toPb(), null);
}
retrying = false;
return;
} else if (remoteNextByteOffset == -1 && !last) {
} else if (remoteNextByteOffset == -1 && !last && !checkingForLastChunk) {
// Case 7
// Make sure this is the last chunk.
checkingForLastChunk = true;
// Continue onto next chunk in case this is the last chunk
return;
} else if (remoteNextByteOffset == -1 && !last && checkingForLastChunk) {
// Case 8
throw new StorageException(0, "Resumable upload is already complete.");
} else {
// Case 9
StringBuilder sb = new StringBuilder();
sb.append(
"Local offset has progressed beyond the remote byte offset of next chunk.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only diff between this block and case 4 is Local vs Remote can we extract a method of this logic so it's easier to keep in sync?

throw new StorageException(0, sb.toString());
}
}
// Request was successful and retrying state is now disabled.
Expand Down
Expand Up @@ -766,6 +766,7 @@ public long getCurrentUploadOffset(String uploadId) {
response = httpRequest.execute();
int code = response.getStatusCode();
if (code == 201 || code == 200) {
// Upload completed successfully
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create constants for compared values instead of commenting

201 -> STATUS_CREATED
200 -> STATUS_OK

return -1;
}
StringBuilder sb = new StringBuilder();
Expand All @@ -783,11 +784,7 @@ public long getCurrentUploadOffset(String uploadId) {
// Return next byte offset by adding 1 to last byte received offset
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
} else {
// Not certain what went wrong
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
sb.append("Response:\n").append(ex.toString()).append("\n\n");
throw new StorageException(0, sb.toString());
throw translate(ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to keep the information in the exception message?

}
} finally {
if (response != null) {
Expand Down