Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retrying get remote offset and recover from last chunk failures. #726

Merged
merged 7 commits into from Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;
private boolean checkingForLastChunk = false;

boolean isRetrying() {
return retrying;
Expand All @@ -64,129 +65,141 @@ StorageObject getStorageObject() {
return storageObject;
}

private StorageObject transmitChunk(
int chunkOffset, int chunkLength, long position, boolean last) {
return getOptions()
.getStorageRpcV1()
.writeWithResponse(getUploadId(), getBuffer(), chunkOffset, position, chunkLength, last);
}

private long getRemotePosition() {
return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
}

private StorageObject getRemoteStorageObject() {
return getOptions().getStorageRpcV1().get(getEntity().toPb(), null);
}

private StorageException unrecoverableState(
int chunkOffset, int chunkLength, long localPosition, long remotePosition, boolean last) {
StringBuilder sb = new StringBuilder();
sb.append("Unable to recover in upload.\n");
sb.append(
"This may be a symptom of multiple clients uploading to the same upload session.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("chunkOffset: ").append(chunkOffset).append('\n');
sb.append("chunkLength: ").append(chunkLength).append('\n');
sb.append("localOffset: ").append(localPosition).append('\n');
sb.append("remoteOffset: ").append(remotePosition).append('\n');
sb.append("lastChunk: ").append(last).append("\n\n");
return new StorageException(0, sb.toString());
}

// Retriable interruption occurred.
// Variables:
// chunk = getBuffer()
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == remoteNextByteOffset:
// Retrying the entire chunk
//
// Case 2: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
// driftOffset must be less than chunkSize for it to retry using
// chunk maintained in memory.
// Find the driftOffset by subtracting localNextByteOffset from
// remoteNextByteOffset.
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 2.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 5: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 6: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk
// Not last chunk and are not checkingForLastChunk, allow for the client to
// catch up to final chunk which meets
// Case 6.
//
// Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk
// Not last chunk and checkingForLastChunk means this is the second time we
// hit this case, meaning the upload was completed by a different client.
//
// Case 9: Only possible if the client local offset continues beyond the remote
// offset which is not possible.
//
@Override
protected void flushBuffer(final int length, final boolean last) {
protected void flushBuffer(final int length, final boolean lastChunk) {
try {
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
// Get remote offset from API
final long localPosition = getPosition();
// For each request it should be possible to retry from its location in this code
final long remotePosition = isRetrying() ? getRemotePosition() : getPosition();
final int chunkOffset = (int) (remotePosition - localPosition);
final int chunkLength = length - chunkOffset;
final boolean uploadAlreadyComplete = remotePosition == -1;
// Enable isRetrying state to reduce number of calls to getRemotePosition()
if (!isRetrying()) {
// Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
retrying = true;
}
if (uploadAlreadyComplete && lastChunk) {
// Case 6
// Request object metadata if not available
if (storageObject == null) {
storageObject = getRemoteStorageObject();
}
// Verify that with the final chunk we match the blob length
if (storageObject.getSize().longValue() != getPosition() + length) {
throw unrecoverableState(
chunkOffset, chunkLength, localPosition, remotePosition, lastChunk);
}
retrying = false;
} else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk) {
// Case 7
// Make sure this is the second to last chunk.
checkingForLastChunk = true;
// Continue onto next chunk in case this is the last chunk
} else if (localPosition <= remotePosition && chunkOffset < getChunkSize()) {
// Case 1 && Case 2
// We are in a position to send a chunk
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
transmitChunk(chunkOffset, chunkLength, remotePosition, lastChunk);
retrying = false;
} else if (localPosition < remotePosition && chunkOffset == getChunkSize()) {
// Case 3
// Continue to next chunk to catch up with remotePosition we are one chunk
// behind
retrying = false;
} else {
// Retriable interruption occurred.
// Variables:
// chunk = getBuffer()
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
// we are retrying from first chunk start from 0 offset.
//
// Case 2: localNextByteOffset == remoteNextByteOffset:
// Special case of Case 1 when a chunk is retried.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
// driftOffset must be less than chunkSize for it to retry using
// chunk maintained in memory.
// Find the driftOffset by subtracting localNextByteOffset from
// remoteNextByteOffset.
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 3.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 5: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 6: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 7: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 8: remoteNextByteOffset==-1 && last == false
// Upload was completed by another client because this retry did not occur
// during the last chunk.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
long localNextByteOffset = getPosition();
int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
int retryChunkLength = length - driftOffset;

if (localNextByteOffset == 0 && remoteNextByteOffset == 0
|| localNextByteOffset == remoteNextByteOffset) {
// Case 1 and 2
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset < getChunkSize()) {
// Case 3
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(),
getBuffer(),
driftOffset,
remoteNextByteOffset,
retryChunkLength,
last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset == getChunkSize()) {
// Case 4
// Continue to next chunk
retrying = false;
return;
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset > getChunkSize()) {
// Case 5
StringBuilder sb = new StringBuilder();
sb.append(
"Remote offset has progressed beyond starting byte offset of next chunk.");
sb.append(
"This may be a symptom of multiple clients uploading to the same upload session.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
} else if (remoteNextByteOffset == -1 && last) {
// Case 7
retrying = false;
return;
} else if (remoteNextByteOffset == -1 && !last) {
// Case 8
throw new StorageException(0, "Resumable upload is already complete.");
}
// Case 4 && Case 8 && Case 9
throw unrecoverableState(
chunkOffset, chunkLength, localPosition, remotePosition, lastChunk);
}
// Request was successful and retrying state is now disabled.
retrying = false;
}
}),
getOptions().getRetrySettings(),
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.InputStreamContent;
import com.google.api.client.http.json.JsonHttpContent;
Expand Down Expand Up @@ -765,7 +766,8 @@ public long getCurrentUploadOffset(String uploadId) {
try {
response = httpRequest.execute();
int code = response.getStatusCode();
if (code == 201 || code == 200) {
if (HttpStatusCodes.isSuccess(code)) {
// Upload completed successfully
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create constants for compared values instead of commenting

201 -> STATUS_CREATED
200 -> STATUS_OK

return -1;
}
StringBuilder sb = new StringBuilder();
Expand All @@ -774,20 +776,18 @@ public long getCurrentUploadOffset(String uploadId) {
throw new StorageException(0, sb.toString());
} catch (HttpResponseException ex) {
int code = ex.getStatusCode();
if (code == 308 && ex.getHeaders().getRange() == null) {
// No progress has been made.
return 0;
} else if (code == 308 && ex.getHeaders().getRange() != null) {
if (code == 308) {
if (ex.getHeaders().getRange() == null) {
// No progress has been made.
return 0;
}
// API returns last byte received offset
String range = ex.getHeaders().getRange();
// Return next byte offset by adding 1 to last byte received offset
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
} else {
// Not certain what went wrong
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
sb.append("Response:\n").append(ex.toString()).append("\n\n");
throw new StorageException(0, sb.toString());
// Something else occurred like a 5xx so translate and throw.
throw translate(ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to keep the information in the exception message?

}
} finally {
if (response != null) {
Expand Down