Skip to content

Commit

Permalink
fix: correct lastChunk retry logic in BlobWriteChannel (#918)
Browse files Browse the repository at this point in the history
Add new method StorageRpc#queryResumableUpload which allows getting a shallow StorageObject for a resumable upload session which is complete.

Update BlobWriteChannel to use StoageRpc#queryResumableUpload instead of StorageRpc#get when attempting to validate the upload size of an object when it determines the upload is complete and is on the last chunk.

If a BlobWriteChannel is opened with a conditional like IfGenerationMatch it is not possible to simply get the object, as the object can drift generationally while the resumable upload is being performed.

Related to #839
  • Loading branch information
BenWhitehead committed Jul 13, 2021
1 parent 8b05867 commit ab0228c
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 25 deletions.
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Expand Up @@ -31,4 +31,9 @@
<method>long getCurrentUploadOffset(java.lang.String)</method>
<differenceType>7012</differenceType>
</difference>
<difference>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>com.google.api.services.storage.model.StorageObject queryCompletedResumableUpload(java.lang.String, long)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.Maps;
import java.math.BigInteger;
import java.net.URL;
import java.util.Map;
Expand Down Expand Up @@ -78,12 +77,6 @@ private long getRemotePosition() {
return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
}

private StorageObject getRemoteStorageObject() {
return getOptions()
.getStorageRpcV1()
.get(getEntity().toPb(), Maps.newEnumMap(StorageRpc.Option.class));
}

private static StorageException unrecoverableState(
String uploadId,
int chunkOffset,
Expand Down Expand Up @@ -212,8 +205,12 @@ public void run() {
if (uploadAlreadyComplete && lastChunk) {
// Case 6
// Request object metadata if not available
long totalBytes = getPosition() + length;
if (storageObject == null) {
storageObject = getRemoteStorageObject();
storageObject =
getOptions()
.getStorageRpcV1()
.queryCompletedResumableUpload(getUploadId(), totalBytes);
}
// the following checks are defined here explicitly to provide a more
// informative if either storageObject is unable to be resolved or it's size is
Expand All @@ -239,7 +236,7 @@ public void run() {
remotePosition,
lastChunk);
}
if (size.longValue() != getPosition() + length) {
if (size.longValue() != totalBytes) {
throw unrecoverableState(
getUploadId(),
chunkOffset,
Expand Down
Expand Up @@ -810,6 +810,25 @@ public long getCurrentUploadOffset(String uploadId) {
}
}

@Override
public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) {
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest req = storage.getRequestFactory().buildPutRequest(url, new EmptyContent());
req.getHeaders().setContentRange(String.format("bytes */%s", totalBytes));
req.setParser(storage.getObjectParser());
HttpResponse response = req.execute();
// If the response is 200
if (response.getStatusCode() == 200) {
return response.parseAs(StorageObject.class);
} else {
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down Expand Up @@ -875,10 +894,7 @@ public StorageObject writeWithResponse(
if (exception != null) {
throw exception;
}
GoogleJsonError error = new GoogleJsonError();
error.setCode(code);
error.setMessage(message);
throw translate(error);
throw buildStorageException(code, message);
}
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
Expand Down Expand Up @@ -925,10 +941,7 @@ public String open(StorageObject object, Map<Option, ?> options) {
setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options);
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 200) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
Expand Down Expand Up @@ -962,10 +975,7 @@ public String open(String signedURL) {

HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
Expand Down Expand Up @@ -1625,4 +1635,11 @@ public ServiceAccount getServiceAccount(String projectId) {
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

private static StorageException buildStorageException(int statusCode, String statusMessage) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(statusCode);
error.setMessage(statusMessage);
return translate(error);
}
}
Expand Up @@ -338,6 +338,24 @@ void write(
*/
long getCurrentUploadOffset(String uploadId);

/**
* Attempts to retrieve the StorageObject from a completed resumable upload. When a resumable
* upload completes, the response will be the up-to-date StorageObject metadata. This up-to-date
* metadata can then be used to validate the total size of the object along with new generation
* and other information.
*
* <p>If for any reason, the response to the final PUT to a resumable upload is not received, this
* method can be used to query for the up-to-date StorageObject. If the upload is complete, this
* method can be used to access the StorageObject independently from any other liveness or
* conditional criteria requirements that are otherwise applicable when using {@link
* #get(StorageObject, Map)}.
*
* @param uploadId resumable upload ID URL
* @param totalBytes the total number of bytes that should have been written.
* @throws StorageException if the upload is incomplete or does not exist
*/
StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes);

/**
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
* returns metadata of the updated object, otherwise returns null.
Expand Down
Expand Up @@ -144,6 +144,11 @@ public long getCurrentUploadOffset(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Expand Up @@ -40,7 +40,6 @@
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.math.BigInteger;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -334,10 +333,10 @@ public void testWriteWithRetryAndObjectMetadata() throws IOException {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
Expand Down Expand Up @@ -487,7 +486,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
eq(true)))
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
Expand Down
Expand Up @@ -82,6 +82,9 @@
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRoles;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.cloud.storage.spi.v1.StorageRpc.Option;
import com.google.cloud.storage.testing.RemoteStorageHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -90,6 +93,8 @@
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteStreams;
import com.google.common.reflect.AbstractInvocationHandler;
import com.google.common.reflect.Reflection;
import com.google.iam.v1.Binding;
import com.google.iam.v1.IAMPolicyGrpc;
import com.google.iam.v1.SetIamPolicyRequest;
Expand All @@ -107,9 +112,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
Expand All @@ -125,6 +132,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
Expand All @@ -139,7 +147,14 @@
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.threeten.bp.Clock;
import org.threeten.bp.Instant;
import org.threeten.bp.ZoneId;
import org.threeten.bp.ZoneOffset;
import org.threeten.bp.format.DateTimeFormatter;

public class ITStorageTest {

Expand Down Expand Up @@ -200,6 +215,8 @@ public class ITStorageTest {
private static final ImmutableList<LifecycleRule> LIFECYCLE_RULES =
ImmutableList.of(LIFECYCLE_RULE_1, LIFECYCLE_RULE_2);

@Rule public final TestName testName = new TestName();

@BeforeClass
public static void beforeClass() throws IOException {
remoteStorageHelper = RemoteStorageHelper.create();
Expand Down Expand Up @@ -3813,4 +3830,125 @@ public void testWriterWithKmsKeyName() throws IOException {
assertThat(blob.getKmsKeyName()).isNotNull();
assertThat(storage.delete(BUCKET, blobName)).isTrue();
}

@Test
public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_multipleChunks()
throws IOException {
int _2MiB = 256 * 1024;
int contentSize = 292_617;

blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_2MiB, contentSize);
}

@Test
public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_singleChunk()
throws IOException {
int _4MiB = 256 * 1024 * 2;
int contentSize = 292_617;

blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_4MiB, contentSize);
}

private void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(
int chunkSize, int contentSize) throws IOException {
Instant now = Clock.systemUTC().instant();
DateTimeFormatter formatter =
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
String nowString = formatter.format(now);

String blobPath = String.format("%s/%s/blob", testName.getMethodName(), nowString);
BlobId blobId = BlobId.of(BUCKET, blobPath);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

Random rand = new Random(1234567890);
String randString = randString(rand, contentSize);
final byte[] randStringBytes = randString.getBytes(StandardCharsets.UTF_8);
Storage storage = StorageOptions.getDefaultInstance().getService();
WriteChannel ww = storage.writer(blobInfo);
ww.setChunkSize(chunkSize);
ww.write(ByteBuffer.wrap(randStringBytes));
ww.close();

Blob blobGen1 = storage.get(blobId);

final AtomicBoolean exceptionThrown = new AtomicBoolean(false);

Storage testStorage =
StorageOptions.newBuilder()
.setServiceRpcFactory(
new StorageRpcFactory() {
/**
* Here we're creating a proxy of StorageRpc where we can delegate all calls to
* the normal implementation, except in the case of {@link
* StorageRpc#writeWithResponse(String, byte[], int, long, int, boolean)} where
* {@code lastChunk == true}. We allow the call to execute, but instead of
* returning the result we throw an IOException to simulate a prematurely close
* connection. This behavior is to ensure appropriate handling of a completed
* upload where the ACK wasn't received. In particular, if an upload is initiated
* against an object where an {@link Option#IF_GENERATION_MATCH} simply calling
* get on an object can result in a 404 because the object that is created while
* the BlobWriteChannel is executing will be a new generation.
*/
@SuppressWarnings("UnstableApiUsage")
@Override
public StorageRpc create(final StorageOptions options) {
return Reflection.newProxy(
StorageRpc.class,
new AbstractInvocationHandler() {
final StorageRpc delegate =
(StorageRpc) StorageOptions.getDefaultInstance().getRpc();

@Override
protected Object handleInvocation(
Object proxy, Method method, Object[] args) throws Throwable {
if ("writeWithResponse".equals(method.getName())) {
Object result = method.invoke(delegate, args);
boolean lastChunk = (boolean) args[5];
// if we're on the lastChunk simulate a connection failure which
// happens after the request was processed but before response could
// be received by the client.
if (lastChunk) {
exceptionThrown.set(true);
throw StorageException.translate(
new IOException("simulated Connection closed prematurely"));
} else {
return result;
}
}
return method.invoke(delegate, args);
}
});
}
})
.build()
.getService();

try (WriteChannel w = testStorage.writer(blobGen1, BlobWriteOption.generationMatch())) {
w.setChunkSize(chunkSize);

ByteBuffer buffer = ByteBuffer.wrap(randStringBytes);
w.write(buffer);
}

assertTrue("Expected an exception to be thrown for the last chunk", exceptionThrown.get());

Blob blobGen2 = storage.get(blobId);
assertEquals(contentSize, (long) blobGen2.getSize());
assertNotEquals(blobInfo.getGeneration(), blobGen2.getGeneration());
ByteArrayOutputStream actualData = new ByteArrayOutputStream();
blobGen2.downloadTo(actualData);
assertArrayEquals(randStringBytes, actualData.toByteArray());
}

private static String randString(Random rand, int length) {
final StringBuilder sb = new StringBuilder();
while (sb.length() < length) {
int i = rand.nextInt('z');
char c = (char) i;
if (Character.isLetter(c) || Character.isDigit(c)) {
sb.append(c);
}
}
return sb.toString();
}
}

0 comments on commit ab0228c

Please sign in to comment.