diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index 02722fac6..85c4887d6 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -31,4 +31,9 @@ long getCurrentUploadOffset(java.lang.String) 7012 + + com/google/cloud/storage/spi/v1/StorageRpc + com.google.api.services.storage.model.StorageObject queryCompletedResumableUpload(java.lang.String, long) + 7012 + diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index 2d7a8520f..6207ef662 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -25,7 +25,6 @@ import com.google.cloud.RetryHelper; import com.google.cloud.WriteChannel; import com.google.cloud.storage.spi.v1.StorageRpc; -import com.google.common.collect.Maps; import java.math.BigInteger; import java.net.URL; import java.util.Map; @@ -78,12 +77,6 @@ private long getRemotePosition() { return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId()); } - private StorageObject getRemoteStorageObject() { - return getOptions() - .getStorageRpcV1() - .get(getEntity().toPb(), Maps.newEnumMap(StorageRpc.Option.class)); - } - private static StorageException unrecoverableState( String uploadId, int chunkOffset, @@ -212,8 +205,12 @@ public void run() { if (uploadAlreadyComplete && lastChunk) { // Case 6 // Request object metadata if not available + long totalBytes = getPosition() + length; if (storageObject == null) { - storageObject = getRemoteStorageObject(); + storageObject = + getOptions() + .getStorageRpcV1() + .queryCompletedResumableUpload(getUploadId(), totalBytes); } // the following checks are defined here explicitly to provide a more // informative if either storageObject is unable to be resolved or it's size is @@ -239,7 +236,7 @@ public void run() { remotePosition, lastChunk); } - if (size.longValue() != getPosition() + length) { + if (size.longValue() != totalBytes) { throw unrecoverableState( getUploadId(), chunkOffset, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 11d4b9876..71b952c43 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -810,6 +810,25 @@ public long getCurrentUploadOffset(String uploadId) { } } + @Override + public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) { + try { + GenericUrl url = new GenericUrl(uploadId); + HttpRequest req = storage.getRequestFactory().buildPutRequest(url, new EmptyContent()); + req.getHeaders().setContentRange(String.format("bytes */%s", totalBytes)); + req.setParser(storage.getObjectParser()); + HttpResponse response = req.execute(); + // If the response is 200 + if (response.getStatusCode() == 200) { + return response.parseAs(StorageObject.class); + } else { + throw buildStorageException(response.getStatusCode(), response.getStatusMessage()); + } + } catch (IOException ex) { + throw translate(ex); + } + } + @Override public StorageObject writeWithResponse( String uploadId, @@ -875,10 +894,7 @@ public StorageObject writeWithResponse( if (exception != null) { throw exception; } - GoogleJsonError error = new GoogleJsonError(); - error.setCode(code); - error.setMessage(message); - throw translate(error); + throw buildStorageException(code, message); } } catch (IOException ex) { span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage())); @@ -925,10 +941,7 @@ public String open(StorageObject object, Map options) { setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options); HttpResponse response = httpRequest.execute(); if (response.getStatusCode() != 200) { - GoogleJsonError error = new GoogleJsonError(); - error.setCode(response.getStatusCode()); - error.setMessage(response.getStatusMessage()); - throw translate(error); + throw buildStorageException(response.getStatusCode(), response.getStatusMessage()); } return response.getHeaders().getLocation(); } catch (IOException ex) { @@ -962,10 +975,7 @@ public String open(String signedURL) { HttpResponse response = httpRequest.execute(); if (response.getStatusCode() != 201) { - GoogleJsonError error = new GoogleJsonError(); - error.setCode(response.getStatusCode()); - error.setMessage(response.getStatusMessage()); - throw translate(error); + throw buildStorageException(response.getStatusCode(), response.getStatusMessage()); } return response.getHeaders().getLocation(); } catch (IOException ex) { @@ -1625,4 +1635,11 @@ public ServiceAccount getServiceAccount(String projectId) { span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS); } } + + private static StorageException buildStorageException(int statusCode, String statusMessage) { + GoogleJsonError error = new GoogleJsonError(); + error.setCode(statusCode); + error.setMessage(statusMessage); + return translate(error); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java index b695559b5..201078efa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java @@ -338,6 +338,24 @@ void write( */ long getCurrentUploadOffset(String uploadId); + /** + * Attempts to retrieve the StorageObject from a completed resumable upload. When a resumable + * upload completes, the response will be the up-to-date StorageObject metadata. This up-to-date + * metadata can then be used to validate the total size of the object along with new generation + * and other information. + * + *

If for any reason, the response to the final PUT to a resumable upload is not received, this + * method can be used to query for the up-to-date StorageObject. If the upload is complete, this + * method can be used to access the StorageObject independently from any other liveness or + * conditional criteria requirements that are otherwise applicable when using {@link + * #get(StorageObject, Map)}. + * + * @param uploadId resumable upload ID URL + * @param totalBytes the total number of bytes that should have been written. + * @throws StorageException if the upload is incomplete or does not exist + */ + StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes); + /** * Writes the provided bytes to a storage object at the provided location. If {@code last=true} * returns metadata of the updated object, otherwise returns null. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java index b4479682a..43e29d011 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java @@ -144,6 +144,11 @@ public long getCurrentUploadOffset(String uploadId) { throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public StorageObject writeWithResponse( String uploadId, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index d74e87d64..d25bd64a3 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -40,7 +40,6 @@ import com.google.cloud.storage.spi.StorageRpcFactory; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import java.io.IOException; import java.math.BigInteger; import java.net.MalformedURLException; @@ -334,10 +333,10 @@ public void testWriteWithRetryAndObjectMetadata() throws IOException { .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); - expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class))) + expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); - expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class))) + expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); @@ -487,7 +486,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException { eq(true))) .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); - expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class))) + expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java index 7ed1239b2..f8f51d386 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java @@ -82,6 +82,9 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRoles; +import com.google.cloud.storage.spi.StorageRpcFactory; +import com.google.cloud.storage.spi.v1.StorageRpc; +import com.google.cloud.storage.spi.v1.StorageRpc.Option; import com.google.cloud.storage.testing.RemoteStorageHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -90,6 +93,8 @@ import com.google.common.collect.Lists; import com.google.common.io.BaseEncoding; import com.google.common.io.ByteStreams; +import com.google.common.reflect.AbstractInvocationHandler; +import com.google.common.reflect.Reflection; import com.google.iam.v1.Binding; import com.google.iam.v1.IAMPolicyGrpc; import com.google.iam.v1.SetIamPolicyRequest; @@ -107,9 +112,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Method; import java.net.URL; import java.net.URLConnection; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.security.Key; @@ -125,6 +132,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPInputStream; @@ -139,7 +147,14 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.threeten.bp.Clock; +import org.threeten.bp.Instant; +import org.threeten.bp.ZoneId; +import org.threeten.bp.ZoneOffset; +import org.threeten.bp.format.DateTimeFormatter; public class ITStorageTest { @@ -200,6 +215,8 @@ public class ITStorageTest { private static final ImmutableList LIFECYCLE_RULES = ImmutableList.of(LIFECYCLE_RULE_1, LIFECYCLE_RULE_2); + @Rule public final TestName testName = new TestName(); + @BeforeClass public static void beforeClass() throws IOException { remoteStorageHelper = RemoteStorageHelper.create(); @@ -3813,4 +3830,125 @@ public void testWriterWithKmsKeyName() throws IOException { assertThat(blob.getKmsKeyName()).isNotNull(); assertThat(storage.delete(BUCKET, blobName)).isTrue(); } + + @Test + public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_multipleChunks() + throws IOException { + int _2MiB = 256 * 1024; + int contentSize = 292_617; + + blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_2MiB, contentSize); + } + + @Test + public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_singleChunk() + throws IOException { + int _4MiB = 256 * 1024 * 2; + int contentSize = 292_617; + + blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_4MiB, contentSize); + } + + private void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent( + int chunkSize, int contentSize) throws IOException { + Instant now = Clock.systemUTC().instant(); + DateTimeFormatter formatter = + DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC)); + String nowString = formatter.format(now); + + String blobPath = String.format("%s/%s/blob", testName.getMethodName(), nowString); + BlobId blobId = BlobId.of(BUCKET, blobPath); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + + Random rand = new Random(1234567890); + String randString = randString(rand, contentSize); + final byte[] randStringBytes = randString.getBytes(StandardCharsets.UTF_8); + Storage storage = StorageOptions.getDefaultInstance().getService(); + WriteChannel ww = storage.writer(blobInfo); + ww.setChunkSize(chunkSize); + ww.write(ByteBuffer.wrap(randStringBytes)); + ww.close(); + + Blob blobGen1 = storage.get(blobId); + + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + + Storage testStorage = + StorageOptions.newBuilder() + .setServiceRpcFactory( + new StorageRpcFactory() { + /** + * Here we're creating a proxy of StorageRpc where we can delegate all calls to + * the normal implementation, except in the case of {@link + * StorageRpc#writeWithResponse(String, byte[], int, long, int, boolean)} where + * {@code lastChunk == true}. We allow the call to execute, but instead of + * returning the result we throw an IOException to simulate a prematurely close + * connection. This behavior is to ensure appropriate handling of a completed + * upload where the ACK wasn't received. In particular, if an upload is initiated + * against an object where an {@link Option#IF_GENERATION_MATCH} simply calling + * get on an object can result in a 404 because the object that is created while + * the BlobWriteChannel is executing will be a new generation. + */ + @SuppressWarnings("UnstableApiUsage") + @Override + public StorageRpc create(final StorageOptions options) { + return Reflection.newProxy( + StorageRpc.class, + new AbstractInvocationHandler() { + final StorageRpc delegate = + (StorageRpc) StorageOptions.getDefaultInstance().getRpc(); + + @Override + protected Object handleInvocation( + Object proxy, Method method, Object[] args) throws Throwable { + if ("writeWithResponse".equals(method.getName())) { + Object result = method.invoke(delegate, args); + boolean lastChunk = (boolean) args[5]; + // if we're on the lastChunk simulate a connection failure which + // happens after the request was processed but before response could + // be received by the client. + if (lastChunk) { + exceptionThrown.set(true); + throw StorageException.translate( + new IOException("simulated Connection closed prematurely")); + } else { + return result; + } + } + return method.invoke(delegate, args); + } + }); + } + }) + .build() + .getService(); + + try (WriteChannel w = testStorage.writer(blobGen1, BlobWriteOption.generationMatch())) { + w.setChunkSize(chunkSize); + + ByteBuffer buffer = ByteBuffer.wrap(randStringBytes); + w.write(buffer); + } + + assertTrue("Expected an exception to be thrown for the last chunk", exceptionThrown.get()); + + Blob blobGen2 = storage.get(blobId); + assertEquals(contentSize, (long) blobGen2.getSize()); + assertNotEquals(blobInfo.getGeneration(), blobGen2.getGeneration()); + ByteArrayOutputStream actualData = new ByteArrayOutputStream(); + blobGen2.downloadTo(actualData); + assertArrayEquals(randStringBytes, actualData.toByteArray()); + } + + private static String randString(Random rand, int length) { + final StringBuilder sb = new StringBuilder(); + while (sb.length() < length) { + int i = rand.nextInt('z'); + char c = (char) i; + if (Character.isLetter(c) || Character.isDigit(c)) { + sb.append(c); + } + } + return sb.toString(); + } }