Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct lastChunk retry logic in BlobWriteChannel #918

Merged
merged 3 commits into from Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Expand Up @@ -31,4 +31,9 @@
<method>long getCurrentUploadOffset(java.lang.String)</method>
<differenceType>7012</differenceType>
</difference>
<difference>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>com.google.api.services.storage.model.StorageObject queryResumableUpload(java.lang.String, long)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.Maps;
import java.math.BigInteger;
import java.net.URL;
import java.util.Map;
Expand Down Expand Up @@ -78,12 +77,6 @@ private long getRemotePosition() {
return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
}

private StorageObject getRemoteStorageObject() {
return getOptions()
.getStorageRpcV1()
.get(getEntity().toPb(), Maps.newEnumMap(StorageRpc.Option.class));
}

private static StorageException unrecoverableState(
String uploadId,
int chunkOffset,
Expand Down Expand Up @@ -212,8 +205,12 @@ public void run() {
if (uploadAlreadyComplete && lastChunk) {
// Case 6
// Request object metadata if not available
long totalBytes = getPosition() + length;
if (storageObject == null) {
storageObject = getRemoteStorageObject();
storageObject =
getOptions()
.getStorageRpcV1()
.queryResumableUpload(getUploadId(), totalBytes);
}
// the following checks are defined here explicitly to provide a more
// informative if either storageObject is unable to be resolved or it's size is
Expand All @@ -239,7 +236,7 @@ public void run() {
remotePosition,
lastChunk);
}
if (size.longValue() != getPosition() + length) {
if (size.longValue() != totalBytes) {
throw unrecoverableState(
getUploadId(),
chunkOffset,
Expand Down
Expand Up @@ -810,6 +810,27 @@ public long getCurrentUploadOffset(String uploadId) {
}
}

@Override
public StorageObject queryResumableUpload(String uploadId, long totalBytes) {
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest req = storage.getRequestFactory().buildPutRequest(url, new EmptyContent());
req.getHeaders().setContentRange(String.format("bytes */%s", totalBytes));
req.setParser(storage.getObjectParser());
HttpResponse response = req.execute();
// If the response is 200
if (response.getStatusCode() == 200) {
return response.parseAs(StorageObject.class);
// String s = response.parseAsString();
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
// throw new RuntimeException("kaboobm");
} else {
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this means that the caller would have made a request to do an upload, but they get an error from the request to check the upload status which is a separate RPC. This might be confusing-- is there a way to wrap this in an error that corresponds to the method they originally called?

Also, if the response is a 308, I would assume that instead of failing we should retry from the last offset, no?

(Ignore me if I'm just missing how this works).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, when a resumable upload completes the shallow Object will be returned in the response. If the response is not received for any reason (network failure usually), this method allows for querying for that shallow Object. This method is used as part of the retry evaluation logic to determine if a final chunk must be retransmitted or not. This method will only return cleanly if the resumable upload returns 200, if a 308 is returned due to an incomplete upload a StorageException will be thrown here to signal that fact. In the case of BlobWriteChannel which uses this method, the exception would be caught by the retry handler and be evaluated there.

I've added a javadoc to the method on there interface with an explanation of why it's there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, the javadoc really helps, thank you.

So, in the 308 case-- would we then (if in BlobWriteChannel) make another identical PUT request to query the correct offset to retry at? If so, is there a way to preserve the information from the 308 response here instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of BlobWriteChannel it performs a pre-check to try and determine the remote offset, if the remote offset indicates that the data has been written but for some reason it wasn't able to get the object metadata as part of the final PUT, it will then use this method to try and get that object metadata.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a 308 happens, that would land us in the normal retry flow, where right now unfortunately we don't have the ability to rewind and retry a chunk to try and recover.

In practice, with BlobWriteChannel any call to this new method is guarded by a check to see if the resumable session is complete or not (in fact I would have prefered to make this check an internal implementation detail but unfortunately BlobWriteChannel does not have the means of accessing the http client directly in order to be able to make this call). So, for all intents and purposes we side step the need to be able to rewind a chunk.

}
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down Expand Up @@ -875,10 +896,7 @@ public StorageObject writeWithResponse(
if (exception != null) {
throw exception;
}
GoogleJsonError error = new GoogleJsonError();
error.setCode(code);
error.setMessage(message);
throw translate(error);
throw buildStorageException(code, message);
}
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
Expand Down Expand Up @@ -925,10 +943,7 @@ public String open(StorageObject object, Map<Option, ?> options) {
setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options);
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 200) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
Expand Down Expand Up @@ -958,10 +973,7 @@ public String open(String signedURL) {
requestHeaders.set("x-goog-resumable", "start");
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
Expand Down Expand Up @@ -1621,4 +1633,11 @@ public ServiceAccount getServiceAccount(String projectId) {
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

private static StorageException buildStorageException(int statusCode, String statusMessage) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(statusCode);
error.setMessage(statusMessage);
return translate(error);
}
}
Expand Up @@ -338,6 +338,8 @@ void write(
*/
long getCurrentUploadOffset(String uploadId);

StorageObject queryResumableUpload(String uploadId, long totalBytes);

/**
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
* returns metadata of the updated object, otherwise returns null.
Expand Down
Expand Up @@ -144,6 +144,11 @@ public long getCurrentUploadOffset(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject queryResumableUpload(String uploadId, long totalBytes) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Expand Up @@ -40,7 +40,6 @@
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.math.BigInteger;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -334,10 +333,10 @@ public void testWriteWithRetryAndObjectMetadata() throws IOException {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
Expand Down Expand Up @@ -487,7 +486,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
eq(true)))
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
Expand Down
Expand Up @@ -82,6 +82,9 @@
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRoles;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.cloud.storage.spi.v1.StorageRpc.Option;
import com.google.cloud.storage.testing.RemoteStorageHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -90,6 +93,8 @@
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteStreams;
import com.google.common.reflect.AbstractInvocationHandler;
import com.google.common.reflect.Reflection;
import com.google.iam.v1.Binding;
import com.google.iam.v1.IAMPolicyGrpc;
import com.google.iam.v1.SetIamPolicyRequest;
Expand All @@ -107,9 +112,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
Expand All @@ -125,6 +132,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
Expand All @@ -139,7 +147,14 @@
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.threeten.bp.Clock;
import org.threeten.bp.Instant;
import org.threeten.bp.ZoneId;
import org.threeten.bp.ZoneOffset;
import org.threeten.bp.format.DateTimeFormatter;

public class ITStorageTest {

Expand Down Expand Up @@ -200,6 +215,8 @@ public class ITStorageTest {
private static final ImmutableList<LifecycleRule> LIFECYCLE_RULES =
ImmutableList.of(LIFECYCLE_RULE_1, LIFECYCLE_RULE_2);

@Rule public final TestName testName = new TestName();

@BeforeClass
public static void beforeClass() throws IOException {
remoteStorageHelper = RemoteStorageHelper.create();
Expand Down Expand Up @@ -3805,4 +3822,125 @@ public void testWriterWithKmsKeyName() throws IOException {
assertThat(blob.getKmsKeyName()).isNotNull();
assertThat(storage.delete(BUCKET, blobName)).isTrue();
}

@Test
public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_multipleChunks()
throws IOException {
int _2MiB = 256 * 1024;
int contentSize = 292_617;

blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_2MiB, contentSize);
}

@Test
public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_singleChunk()
throws IOException {
int _4MiB = 256 * 1024 * 2;
int contentSize = 292_617;

blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_4MiB, contentSize);
}

private void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(
int chunkSize, int contentSize) throws IOException {
Instant now = Clock.systemUTC().instant();
DateTimeFormatter formatter =
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
String nowString = formatter.format(now);

String blobPath = String.format("%s/%s/blob", testName.getMethodName(), nowString);
BlobId blobId = BlobId.of(BUCKET, blobPath);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

Random rand = new Random(1234567890);
String randString = randString(rand, contentSize);
final byte[] randStringBytes = randString.getBytes(StandardCharsets.UTF_8);
Storage storage = StorageOptions.getDefaultInstance().getService();
WriteChannel ww = storage.writer(blobInfo);
ww.setChunkSize(chunkSize);
ww.write(ByteBuffer.wrap(randStringBytes));
ww.close();

Blob blobGen1 = storage.get(blobId);

final AtomicBoolean exceptionThrown = new AtomicBoolean(false);

Storage testStorage =
StorageOptions.newBuilder()
.setServiceRpcFactory(
new StorageRpcFactory() {
/**
* Here we're creating a proxy of StorageRpc where we can delegate all calls to
* the normal implementation, except in the case of {@link
* StorageRpc#writeWithResponse(String, byte[], int, long, int, boolean)} where
* {@code lastChunk == true}. We allow the call to execute, but instead of
* returning the result we throw an IOException to simulate a prematurely close
* connection. This behavior is to ensure appropriate handling of a completed
* upload where the ACK wasn't received. In particular, if an upload is initiated
* against an object where an {@link Option#IF_GENERATION_MATCH} simply calling
* get on an object can result in a 404 because the object that is created while
* the BlobWriteChannel is executing will be a new generation.
*/
@SuppressWarnings("UnstableApiUsage")
@Override
public StorageRpc create(final StorageOptions options) {
return Reflection.newProxy(
StorageRpc.class,
new AbstractInvocationHandler() {
final StorageRpc delegate =
(StorageRpc) StorageOptions.getDefaultInstance().getRpc();

@Override
protected Object handleInvocation(
Object proxy, Method method, Object[] args) throws Throwable {
if ("writeWithResponse".equals(method.getName())) {
Object result = method.invoke(delegate, args);
boolean lastChunk = (boolean) args[5];
// if we're on the lastChunk simulate a connection failure which
// happens after the request was processed but before response could
// be received by the client.
if (lastChunk) {
exceptionThrown.set(true);
throw StorageException.translate(
new IOException("simulated Connection closed prematurely"));
} else {
return result;
}
}
return method.invoke(delegate, args);
}
});
}
})
.build()
.getService();

try (WriteChannel w = testStorage.writer(blobGen1, BlobWriteOption.generationMatch())) {
w.setChunkSize(chunkSize);

ByteBuffer buffer = ByteBuffer.wrap(randStringBytes);
w.write(buffer);
}

assertTrue("Expected an exception to be thrown for the last chunk", exceptionThrown.get());

Blob blobGen2 = storage.get(blobId);
assertEquals(contentSize, (long) blobGen2.getSize());
assertNotEquals(blobInfo.getGeneration(), blobGen2.getGeneration());
ByteArrayOutputStream actualData = new ByteArrayOutputStream();
blobGen2.downloadTo(actualData);
assertArrayEquals(randStringBytes, actualData.toByteArray());
}

private static String randString(Random rand, int length) {
final StringBuilder sb = new StringBuilder();
while (sb.length() < length) {
int i = rand.nextInt('z');
char c = (char) i;
if (Character.isLetter(c) || Character.isDigit(c)) {
sb.append(c);
}
}
return sb.toString();
}
}