From 447d35d5dabc42e5d61b1bbad0ad7a01a29d83a3 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 28 Sep 2021 16:50:40 -0400 Subject: [PATCH] fix(1.113.14-sp): backport critical bug fixes (#993) * feat: Remove client side vaildation for lifecycle conditions (#816) * Remove client side vaildation for lifecycle conditions * fix lint and suggest updating (cherry picked from commit 5ec84cc2935a4787dd14a207d27501878f5849d5) * fix: update BucketInfo translation code to properly handle lifecycle rules (#852) Fixes #850 (cherry picked from commit 3b1df1d00a459b134103bc8738f0294188502a37) * fix: improve error detection and reporting for BlobWriteChannel retry state (#846) Add new checks to ensure a more informative error than NullPointerException is thrown if the StorageObject or it's size are unable to be resolved on the last chunk. Fixes #839 (cherry picked from commit d0f2184f4dd2d99a4315f260f35421358d14a2df) * fix: correct lastChunk retry logic in BlobWriteChannel (#918) Add new method StorageRpc#queryResumableUpload which allows getting a shallow StorageObject for a resumable upload session which is complete. Update BlobWriteChannel to use StoageRpc#queryResumableUpload instead of StorageRpc#get when attempting to validate the upload size of an object when it determines the upload is complete and is on the last chunk. If a BlobWriteChannel is opened with a conditional like IfGenerationMatch it is not possible to simply get the object, as the object can drift generationally while the resumable upload is being performed. Related to #839 (cherry picked from commit ab0228c95df831d79f4a9c993908e5700dab5aa7) * test: remove error string matching (#861) It looks like the text for this error on the backend has changed (sometimes) from "Precondition Failed" to "At least one of the pre-conditions you specified did not hold". I don't think it's really necessary to check the exact message in any case given that we do check for a code of 412, which implies a precondition failure. I added a check of the error Reason instead, which is more standardized. Fixes #853 (cherry picked from commit 146a3d3edeaa0bb8bcf2a03b5586268e743cc913) Co-authored-by: JesseLovelace <43148100+JesseLovelace@users.noreply.github.com> Co-authored-by: Chris Cotter --- .../clirr-ignored-differences.xml | 5 + .../cloud/storage/BlobWriteChannel.java | 101 +++++++++++-- .../com/google/cloud/storage/BucketInfo.java | 81 ++++++---- .../cloud/storage/spi/v1/HttpStorageRpc.java | 41 +++-- .../cloud/storage/spi/v1/StorageRpc.java | 18 +++ .../storage/testing/StorageRpcTestBase.java | 5 + .../cloud/storage/BlobWriteChannelTest.java | 7 +- .../google/cloud/storage/BucketInfoTest.java | 71 +++++++++ .../cloud/storage/it/ITStorageTest.java | 140 +++++++++++++++++- 9 files changed, 408 insertions(+), 61 deletions(-) diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index 02722fac6..85c4887d6 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -31,4 +31,9 @@ long getCurrentUploadOffset(java.lang.String) 7012 + + com/google/cloud/storage/spi/v1/StorageRpc + com.google.api.services.storage.model.StorageObject queryCompletedResumableUpload(java.lang.String, long) + 7012 + diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index 09bc17081..6207ef662 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -25,7 +25,7 @@ import com.google.cloud.RetryHelper; import com.google.cloud.WriteChannel; import com.google.cloud.storage.spi.v1.StorageRpc; -import com.google.common.collect.Maps; +import java.math.BigInteger; import java.net.URL; import java.util.Map; import java.util.concurrent.Callable; @@ -77,20 +77,52 @@ private long getRemotePosition() { return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId()); } - private StorageObject getRemoteStorageObject() { - return getOptions() - .getStorageRpcV1() - .get(getEntity().toPb(), Maps.newEnumMap(StorageRpc.Option.class)); + private static StorageException unrecoverableState( + String uploadId, + int chunkOffset, + int chunkLength, + long localPosition, + long remotePosition, + boolean last) { + return unrecoverableState( + uploadId, + chunkOffset, + chunkLength, + localPosition, + remotePosition, + last, + "Unable to recover in upload.\nThis may be a symptom of multiple clients uploading to the same upload session."); + } + + private static StorageException errorResolvingMetadataLastChunk( + String uploadId, + int chunkOffset, + int chunkLength, + long localPosition, + long remotePosition, + boolean last) { + return unrecoverableState( + uploadId, + chunkOffset, + chunkLength, + localPosition, + remotePosition, + last, + "Unable to load object metadata to determine if last chunk was successfully written"); } - private StorageException unrecoverableState( - int chunkOffset, int chunkLength, long localPosition, long remotePosition, boolean last) { + private static StorageException unrecoverableState( + String uploadId, + int chunkOffset, + int chunkLength, + long localPosition, + long remotePosition, + boolean last, + String message) { StringBuilder sb = new StringBuilder(); - sb.append("Unable to recover in upload.\n"); - sb.append( - "This may be a symptom of multiple clients uploading to the same upload session.\n\n"); + sb.append(message).append("\n\n"); sb.append("For debugging purposes:\n"); - sb.append("uploadId: ").append(getUploadId()).append('\n'); + sb.append("uploadId: ").append(uploadId).append('\n'); sb.append("chunkOffset: ").append(chunkOffset).append('\n'); sb.append("chunkLength: ").append(chunkLength).append('\n'); sb.append("localOffset: ").append(localPosition).append('\n'); @@ -162,7 +194,7 @@ public void run() { // Get remote offset from API final long localPosition = getPosition(); // For each request it should be possible to retry from its location in this code - final long remotePosition = isRetrying() ? getRemotePosition() : getPosition(); + final long remotePosition = isRetrying() ? getRemotePosition() : localPosition; final int chunkOffset = (int) (remotePosition - localPosition); final int chunkLength = length - chunkOffset; final boolean uploadAlreadyComplete = remotePosition == -1; @@ -173,13 +205,45 @@ public void run() { if (uploadAlreadyComplete && lastChunk) { // Case 6 // Request object metadata if not available + long totalBytes = getPosition() + length; if (storageObject == null) { - storageObject = getRemoteStorageObject(); + storageObject = + getOptions() + .getStorageRpcV1() + .queryCompletedResumableUpload(getUploadId(), totalBytes); + } + // the following checks are defined here explicitly to provide a more + // informative if either storageObject is unable to be resolved or it's size is + // unable to be determined. This scenario is a very rare case of failure that + // can arise when packets are lost. + if (storageObject == null) { + throw errorResolvingMetadataLastChunk( + getUploadId(), + chunkOffset, + chunkLength, + localPosition, + remotePosition, + lastChunk); } // Verify that with the final chunk we match the blob length - if (storageObject.getSize().longValue() != getPosition() + length) { + BigInteger size = storageObject.getSize(); + if (size == null) { + throw errorResolvingMetadataLastChunk( + getUploadId(), + chunkOffset, + chunkLength, + localPosition, + remotePosition, + lastChunk); + } + if (size.longValue() != totalBytes) { throw unrecoverableState( - chunkOffset, chunkLength, localPosition, remotePosition, lastChunk); + getUploadId(), + chunkOffset, + chunkLength, + localPosition, + remotePosition, + lastChunk); } retrying = false; } else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk) { @@ -201,7 +265,12 @@ public void run() { } else { // Case 4 && Case 8 && Case 9 throw unrecoverableState( - chunkOffset, chunkLength, localPosition, remotePosition, lastChunk); + getUploadId(), + chunkOffset, + chunkLength, + localPosition, + remotePosition, + lastChunk); } } }), diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketInfo.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketInfo.java index 602be08ac..c13395238 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketInfo.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketInfo.java @@ -43,11 +43,13 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.logging.Logger; /** * Google Storage bucket metadata; @@ -101,6 +103,8 @@ public com.google.api.services.storage.model.Bucket apply(BucketInfo bucketInfo) private final String locationType; private final Logging logging; + private static final Logger log = Logger.getLogger(BucketInfo.class.getName()); + /** * The Bucket's IAM Configuration. * @@ -356,9 +360,11 @@ public LifecycleRule(LifecycleAction action, LifecycleCondition condition) { && condition.getNoncurrentTimeBefore() == null && condition.getCustomTimeBefore() == null && condition.getDaysSinceCustomTime() == null) { - throw new IllegalArgumentException( - "You must specify at least one condition to use object lifecycle " - + "management. Please see https://cloud.google.com/storage/docs/lifecycle for details."); + log.warning( + "Creating a lifecycle condition with no supported conditions:\n" + + this + + "\nAttempting to update with this rule may cause errors. Please update " + + " to the latest version of google-cloud-storage"); } this.lifecycleAction = action; @@ -1833,33 +1839,52 @@ public ObjectAccessControl apply(Acl acl) { website.setNotFoundPage(notFoundPage); bucketPb.setWebsite(website); } - Set rules = new HashSet<>(); - if (deleteRules != null) { - rules.addAll( - transform( - deleteRules, - new Function() { - @Override - public Rule apply(DeleteRule deleteRule) { - return deleteRule.toPb(); - } - })); - } - if (lifecycleRules != null) { - rules.addAll( - transform( - lifecycleRules, - new Function() { - @Override - public Rule apply(LifecycleRule lifecycleRule) { - return lifecycleRule.toPb(); - } - })); - } - if (rules != null) { + if (deleteRules != null || lifecycleRules != null) { Lifecycle lifecycle = new Lifecycle(); - lifecycle.setRule(ImmutableList.copyOf(rules)); + + // Here we determine if we need to "clear" any defined Lifecycle rules by explicitly setting + // the Rule list of lifecycle to the empty list. + // In order for us to clear the rules, one of the three following must be true: + // 1. deleteRules is null while lifecycleRules is non-null and empty + // 2. lifecycleRules is null while deleteRules is non-null and empty + // 3. lifecycleRules is non-null and empty while deleteRules is non-null and empty + // If none of the above three is true, we will interpret as the Lifecycle rules being + // updated to the defined set of DeleteRule and LifecycleRule. + if ((deleteRules == null && lifecycleRules.isEmpty()) + || (lifecycleRules == null && deleteRules.isEmpty()) + || (deleteRules != null && deleteRules.isEmpty() && lifecycleRules.isEmpty())) { + lifecycle.setRule(Collections.emptyList()); + } else { + Set rules = new HashSet<>(); + if (deleteRules != null) { + rules.addAll( + transform( + deleteRules, + new Function() { + @Override + public Rule apply(DeleteRule deleteRule) { + return deleteRule.toPb(); + } + })); + } + if (lifecycleRules != null) { + rules.addAll( + transform( + lifecycleRules, + new Function() { + @Override + public Rule apply(LifecycleRule lifecycleRule) { + return lifecycleRule.toPb(); + } + })); + } + + if (!rules.isEmpty()) { + lifecycle.setRule(ImmutableList.copyOf(rules)); + } + } + bucketPb.setLifecycle(lifecycle); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 6df86cb6a..46b6f2291 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -799,6 +799,25 @@ public long getCurrentUploadOffset(String uploadId) { } } + @Override + public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) { + try { + GenericUrl url = new GenericUrl(uploadId); + HttpRequest req = storage.getRequestFactory().buildPutRequest(url, new EmptyContent()); + req.getHeaders().setContentRange(String.format("bytes */%s", totalBytes)); + req.setParser(storage.getObjectParser()); + HttpResponse response = req.execute(); + // If the response is 200 + if (response.getStatusCode() == 200) { + return response.parseAs(StorageObject.class); + } else { + throw buildStorageException(response.getStatusCode(), response.getStatusMessage()); + } + } catch (IOException ex) { + throw translate(ex); + } + } + @Override public StorageObject writeWithResponse( String uploadId, @@ -864,10 +883,7 @@ public StorageObject writeWithResponse( if (exception != null) { throw exception; } - GoogleJsonError error = new GoogleJsonError(); - error.setCode(code); - error.setMessage(message); - throw translate(error); + throw buildStorageException(code, message); } } catch (IOException ex) { span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage())); @@ -914,10 +930,7 @@ public String open(StorageObject object, Map options) { setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options); HttpResponse response = httpRequest.execute(); if (response.getStatusCode() != 200) { - GoogleJsonError error = new GoogleJsonError(); - error.setCode(response.getStatusCode()); - error.setMessage(response.getStatusMessage()); - throw translate(error); + throw buildStorageException(response.getStatusCode(), response.getStatusMessage()); } return response.getHeaders().getLocation(); } catch (IOException ex) { @@ -947,10 +960,7 @@ public String open(String signedURL) { requestHeaders.set("x-goog-resumable", "start"); HttpResponse response = httpRequest.execute(); if (response.getStatusCode() != 201) { - GoogleJsonError error = new GoogleJsonError(); - error.setCode(response.getStatusCode()); - error.setMessage(response.getStatusMessage()); - throw translate(error); + throw buildStorageException(response.getStatusCode(), response.getStatusMessage()); } return response.getHeaders().getLocation(); } catch (IOException ex) { @@ -1610,4 +1620,11 @@ public ServiceAccount getServiceAccount(String projectId) { span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS); } } + + private static StorageException buildStorageException(int statusCode, String statusMessage) { + GoogleJsonError error = new GoogleJsonError(); + error.setCode(statusCode); + error.setMessage(statusMessage); + return translate(error); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java index cc4dd5740..97e190ae8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java @@ -337,6 +337,24 @@ void write( */ long getCurrentUploadOffset(String uploadId); + /** + * Attempts to retrieve the StorageObject from a completed resumable upload. When a resumable + * upload completes, the response will be the up-to-date StorageObject metadata. This up-to-date + * metadata can then be used to validate the total size of the object along with new generation + * and other information. + * + *

If for any reason, the response to the final PUT to a resumable upload is not received, this + * method can be used to query for the up-to-date StorageObject. If the upload is complete, this + * method can be used to access the StorageObject independently from any other liveness or + * conditional criteria requirements that are otherwise applicable when using {@link + * #get(StorageObject, Map)}. + * + * @param uploadId resumable upload ID URL + * @param totalBytes the total number of bytes that should have been written. + * @throws StorageException if the upload is incomplete or does not exist + */ + StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes); + /** * Writes the provided bytes to a storage object at the provided location. If {@code last=true} * returns metadata of the updated object, otherwise returns null. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java index b4479682a..43e29d011 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java @@ -144,6 +144,11 @@ public long getCurrentUploadOffset(String uploadId) { throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public StorageObject writeWithResponse( String uploadId, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index d74e87d64..d25bd64a3 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -40,7 +40,6 @@ import com.google.cloud.storage.spi.StorageRpcFactory; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import java.io.IOException; import java.math.BigInteger; import java.net.MalformedURLException; @@ -334,10 +333,10 @@ public void testWriteWithRetryAndObjectMetadata() throws IOException { .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); - expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class))) + expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); - expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class))) + expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); @@ -487,7 +486,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException { eq(true))) .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); - expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class))) + expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BucketInfoTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BucketInfoTest.java index 8def7c306..d72901c17 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BucketInfoTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BucketInfoTest.java @@ -19,10 +19,12 @@ import static com.google.cloud.storage.Acl.Project.ProjectRole.VIEWERS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import com.google.api.client.util.DateTime; import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Bucket.Lifecycle; import com.google.api.services.storage.model.Bucket.Lifecycle.Rule; import com.google.cloud.storage.Acl.Project; import com.google.cloud.storage.Acl.Role; @@ -39,6 +41,7 @@ import com.google.cloud.storage.BucketInfo.RawDeleteRule; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -172,6 +175,8 @@ public class BucketInfoTest { .setLogging(LOGGING) .build(); + private static final Lifecycle EMPTY_LIFECYCLE = lifecycle(Collections.emptyList()); + @Test public void testToBuilder() { compareBuckets(BUCKET_INFO, BUCKET_INFO.toBuilder().build()); @@ -376,4 +381,70 @@ public void testLogging() { assertEquals("test-bucket", logging.getLogBucket()); assertEquals("test-", logging.getLogObjectPrefix()); } + + @Test + public void testRuleMappingIsCorrect_noMutations() { + Bucket bucket = bi().build().toPb(); + assertNull(bucket.getLifecycle()); + } + + @Test + public void testRuleMappingIsCorrect_deleteLifecycleRules() { + Bucket bucket = bi().deleteLifecycleRules().build().toPb(); + assertEquals(EMPTY_LIFECYCLE, bucket.getLifecycle()); + } + + @Test + @SuppressWarnings({"deprecation"}) + public void testRuleMappingIsCorrect_setDeleteRules_null() { + Bucket bucket = bi().setDeleteRules(null).build().toPb(); + assertNull(bucket.getLifecycle()); + } + + @Test + @SuppressWarnings({"deprecation"}) + public void testRuleMappingIsCorrect_setDeleteRules_empty() { + Bucket bucket = bi().setDeleteRules(Collections.emptyList()).build().toPb(); + assertEquals(EMPTY_LIFECYCLE, bucket.getLifecycle()); + } + + @Test + public void testRuleMappingIsCorrect_setLifecycleRules_empty() { + Bucket bucket = bi().setLifecycleRules(Collections.emptyList()).build().toPb(); + assertEquals(EMPTY_LIFECYCLE, bucket.getLifecycle()); + } + + @Test + public void testRuleMappingIsCorrect_setLifeCycleRules_nonEmpty() { + LifecycleRule lifecycleRule = + new LifecycleRule( + LifecycleAction.newDeleteAction(), LifecycleCondition.newBuilder().setAge(10).build()); + Rule lifecycleDeleteAfter10 = lifecycleRule.toPb(); + Bucket bucket = bi().setLifecycleRules(ImmutableList.of(lifecycleRule)).build().toPb(); + assertEquals(lifecycle(lifecycleDeleteAfter10), bucket.getLifecycle()); + } + + @Test + @SuppressWarnings({"deprecation"}) + public void testRuleMappingIsCorrect_setDeleteRules_nonEmpty() { + DeleteRule deleteRule = DELETE_RULES.get(0); + Rule deleteRuleAge5 = deleteRule.toPb(); + Bucket bucket = bi().setDeleteRules(ImmutableList.of(deleteRule)).build().toPb(); + assertEquals(lifecycle(deleteRuleAge5), bucket.getLifecycle()); + } + + private static Lifecycle lifecycle(Rule... rules) { + return lifecycle(Arrays.asList(rules)); + } + + private static Lifecycle lifecycle(List rules) { + Lifecycle emptyLifecycle = new Lifecycle(); + emptyLifecycle.setRule(rules); + return emptyLifecycle; + } + + private static BucketInfo.Builder bi() { + String bucketId = "bucketId"; + return BucketInfo.newBuilder(bucketId); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java index 6c4d0da08..24de06287 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java @@ -82,6 +82,9 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRoles; +import com.google.cloud.storage.spi.StorageRpcFactory; +import com.google.cloud.storage.spi.v1.StorageRpc; +import com.google.cloud.storage.spi.v1.StorageRpc.Option; import com.google.cloud.storage.testing.RemoteStorageHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -90,6 +93,8 @@ import com.google.common.collect.Lists; import com.google.common.io.BaseEncoding; import com.google.common.io.ByteStreams; +import com.google.common.reflect.AbstractInvocationHandler; +import com.google.common.reflect.Reflection; import com.google.iam.v1.Binding; import com.google.iam.v1.IAMPolicyGrpc; import com.google.iam.v1.SetIamPolicyRequest; @@ -106,9 +111,11 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Method; import java.net.URL; import java.net.URLConnection; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.security.Key; @@ -124,6 +131,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPInputStream; @@ -137,7 +145,14 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.threeten.bp.Clock; +import org.threeten.bp.Instant; +import org.threeten.bp.ZoneId; +import org.threeten.bp.ZoneOffset; +import org.threeten.bp.format.DateTimeFormatter; public class ITStorageTest { @@ -198,6 +213,8 @@ public class ITStorageTest { private static final ImmutableList LIFECYCLE_RULES = ImmutableList.of(LIFECYCLE_RULE_1, LIFECYCLE_RULE_2); + @Rule public final TestName testName = new TestName(); + @BeforeClass public static void beforeClass() throws IOException { remoteStorageHelper = RemoteStorageHelper.create(); @@ -3370,7 +3387,7 @@ public void testBlobReload() throws Exception { fail("StorageException was expected"); } catch (StorageException e) { assertEquals(412, e.getCode()); - assertEquals("Precondition Failed", e.getMessage()); + assertEquals("conditionNotMet", e.getReason()); } Blob updated = blob.reload(); @@ -3624,4 +3641,125 @@ public void testWriterWithKmsKeyName() throws IOException { assertThat(blob.getKmsKeyName()).isNotNull(); assertThat(storage.delete(BUCKET, blobName)).isTrue(); } + + @Test + public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_multipleChunks() + throws IOException { + int _2MiB = 256 * 1024; + int contentSize = 292_617; + + blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_2MiB, contentSize); + } + + @Test + public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_singleChunk() + throws IOException { + int _4MiB = 256 * 1024 * 2; + int contentSize = 292_617; + + blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_4MiB, contentSize); + } + + private void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent( + int chunkSize, int contentSize) throws IOException { + Instant now = Clock.systemUTC().instant(); + DateTimeFormatter formatter = + DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC)); + String nowString = formatter.format(now); + + String blobPath = String.format("%s/%s/blob", testName.getMethodName(), nowString); + BlobId blobId = BlobId.of(BUCKET, blobPath); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + + Random rand = new Random(1234567890); + String randString = randString(rand, contentSize); + final byte[] randStringBytes = randString.getBytes(StandardCharsets.UTF_8); + Storage storage = StorageOptions.getDefaultInstance().getService(); + WriteChannel ww = storage.writer(blobInfo); + ww.setChunkSize(chunkSize); + ww.write(ByteBuffer.wrap(randStringBytes)); + ww.close(); + + Blob blobGen1 = storage.get(blobId); + + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + + Storage testStorage = + StorageOptions.newBuilder() + .setServiceRpcFactory( + new StorageRpcFactory() { + /** + * Here we're creating a proxy of StorageRpc where we can delegate all calls to + * the normal implementation, except in the case of {@link + * StorageRpc#writeWithResponse(String, byte[], int, long, int, boolean)} where + * {@code lastChunk == true}. We allow the call to execute, but instead of + * returning the result we throw an IOException to simulate a prematurely close + * connection. This behavior is to ensure appropriate handling of a completed + * upload where the ACK wasn't received. In particular, if an upload is initiated + * against an object where an {@link Option#IF_GENERATION_MATCH} simply calling + * get on an object can result in a 404 because the object that is created while + * the BlobWriteChannel is executing will be a new generation. + */ + @SuppressWarnings("UnstableApiUsage") + @Override + public StorageRpc create(final StorageOptions options) { + return Reflection.newProxy( + StorageRpc.class, + new AbstractInvocationHandler() { + final StorageRpc delegate = + (StorageRpc) StorageOptions.getDefaultInstance().getRpc(); + + @Override + protected Object handleInvocation( + Object proxy, Method method, Object[] args) throws Throwable { + if ("writeWithResponse".equals(method.getName())) { + Object result = method.invoke(delegate, args); + boolean lastChunk = (boolean) args[5]; + // if we're on the lastChunk simulate a connection failure which + // happens after the request was processed but before response could + // be received by the client. + if (lastChunk) { + exceptionThrown.set(true); + throw StorageException.translate( + new IOException("simulated Connection closed prematurely")); + } else { + return result; + } + } + return method.invoke(delegate, args); + } + }); + } + }) + .build() + .getService(); + + try (WriteChannel w = testStorage.writer(blobGen1, BlobWriteOption.generationMatch())) { + w.setChunkSize(chunkSize); + + ByteBuffer buffer = ByteBuffer.wrap(randStringBytes); + w.write(buffer); + } + + assertTrue("Expected an exception to be thrown for the last chunk", exceptionThrown.get()); + + Blob blobGen2 = storage.get(blobId); + assertEquals(contentSize, (long) blobGen2.getSize()); + assertNotEquals(blobInfo.getGeneration(), blobGen2.getGeneration()); + ByteArrayOutputStream actualData = new ByteArrayOutputStream(); + blobGen2.downloadTo(actualData); + assertArrayEquals(randStringBytes, actualData.toByteArray()); + } + + private static String randString(Random rand, int length) { + final StringBuilder sb = new StringBuilder(); + while (sb.length() < length) { + int i = rand.nextInt('z'); + char c = (char) i; + if (Character.isLetter(c) || Character.isDigit(c)) { + sb.append(c); + } + } + return sb.toString(); + } }