From 470b8cd8a24c1c2b4be1b956d1691dbae8cf87fd Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 2 Nov 2021 14:34:33 -0400 Subject: [PATCH] feat: update all automatic retry behavior to be idempotency aware (#1132) 1. Add new configuration option `StorageOptions.Builder#setStorageRetryStrategy` 2. Add new interface `StorageRetryStrategy` 3. Update all calls to be made idempotency aware 4. Change default behavior of automatic retries to only retry those operations which are deemed to be idempotent 5. Add @deprecated method `StorageRetryStrategy.getLegacyStorageRetryStrategy` providing access to a strategy which matches the previous sometimes unsafe behavior 6. make BlobWriteChannelTest cases idempotent 7. make StorageImplMockitoTest#testCreateBlobRetry idempotent --- .../google/cloud/storage/StorageOptions.java | 4 +- .../cloud/storage/StorageRetryStrategy.java | 2 +- .../cloud/storage/BlobWriteChannelTest.java | 73 ++++++++++++------- .../PackagePrivateMethodWorkarounds.java | 5 -- .../cloud/storage/StorageImplMockitoTest.java | 10 ++- .../conformance/retry/RetryTestFixture.java | 2 - 6 files changed, 58 insertions(+), 38 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java index 64fe2cf94..a15a90381 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java @@ -87,7 +87,7 @@ public Builder setTransportOptions(TransportOptions transportOptions) { * @return the builder * @see StorageRetryStrategy#getDefaultStorageRetryStrategy() */ - Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy) { + public Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy) { this.storageRetryStrategy = requireNonNull(storageRetryStrategy, "storageRetryStrategy must be non null"); return this; @@ -125,7 +125,7 @@ public TransportOptions getDefaultTransportOptions() { } public StorageRetryStrategy getStorageRetryStrategy() { - return StorageRetryStrategy.getLegacyStorageRetryStrategy(); + return StorageRetryStrategy.getDefaultStorageRetryStrategy(); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageRetryStrategy.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageRetryStrategy.java index 52ae07137..7175c5882 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageRetryStrategy.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageRetryStrategy.java @@ -29,7 +29,7 @@ * @see #getDefaultStorageRetryStrategy() * @see #getUniformStorageRetryStrategy() */ -interface StorageRetryStrategy extends Serializable { +public interface StorageRetryStrategy extends Serializable { /** * Factory method to provide a {@link ResultRetryAlgorithm} which will be used to evaluate whether diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index c65c99448..000a925c0 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -65,8 +65,12 @@ public class BlobWriteChannelTest { private static final String BLOB_NAME = "n"; private static final String UPLOAD_ID = "uploadid"; private static final BlobInfo BLOB_INFO = BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME).build(); + private static final BlobInfo BLOB_INFO_WITH_GENERATION = + BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME, 1L).build(); private static final StorageObject UPDATED_BLOB = new StorageObject(); private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); + private static final Map RPC_OPTIONS_GENERATION = + ImmutableMap.of(StorageRpc.Option.IF_GENERATION_MATCH, 1L); private static final int MIN_CHUNK_SIZE = 256 * 1024; private static final int DEFAULT_CHUNK_SIZE = 60 * MIN_CHUNK_SIZE; // 15MiB private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE; @@ -112,11 +116,12 @@ public void testCreate() { @Test public void testCreateRetryableError() { - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)) + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) .andThrow(socketClosedException); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); assertTrue(writer.isOpen()); assertNull(writer.getStorageObject()); } @@ -146,7 +151,8 @@ public void testWriteWithoutFlush() throws Exception { public void testWriteWithFlushRetryChunk() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -167,7 +173,7 @@ public void testWriteWithFlushRetryChunk() throws Exception { eq(false))) .andReturn(null); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); assertTrue(writer.isOpen()); @@ -179,7 +185,8 @@ public void testWriteWithFlushRetryChunk() throws Exception { public void testWriteWithRetryFullChunk() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), (byte[]) anyObject(), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(false))) @@ -204,7 +211,7 @@ public void testWriteWithRetryFullChunk() throws Exception { eq(true))) .andReturn(BLOB_INFO.toPb()); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); writer.close(); @@ -217,7 +224,8 @@ public void testWriteWithRetryFullChunk() throws Exception { public void testWriteWithRemoteProgressMade() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -239,7 +247,7 @@ public void testWriteWithRemoteProgressMade() throws Exception { eq(false))) .andReturn(null); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); assertTrue(writer.isOpen()); @@ -251,7 +259,8 @@ public void testWriteWithRemoteProgressMade() throws Exception { public void testWriteWithDriftRetryCase4() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -272,7 +281,7 @@ public void testWriteWithDriftRetryCase4() throws Exception { eq(false))) .andReturn(null); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); assertArrayEquals(buffer.array(), capturedBuffer.getValue()); @@ -288,7 +297,8 @@ public void testWriteWithDriftRetryCase4() throws Exception { public void testWriteWithUnreachableRemoteOffset() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -300,7 +310,7 @@ public void testWriteWithUnreachableRemoteOffset() throws Exception { .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(MIN_CHUNK_SIZE + 10L); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); try { writer.write(buffer); @@ -317,7 +327,8 @@ public void testWriteWithUnreachableRemoteOffset() throws Exception { public void testWriteWithRetryAndObjectMetadata() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -345,7 +356,7 @@ public void testWriteWithRetryAndObjectMetadata() throws Exception { expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); writer.close(); @@ -358,7 +369,8 @@ public void testWriteWithRetryAndObjectMetadata() throws Exception { public void testWriteWithUploadCompletedByAnotherClient() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -380,7 +392,7 @@ public void testWriteWithUploadCompletedByAnotherClient() throws Exception { expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); try { writer.write(buffer); @@ -399,7 +411,8 @@ public void testWriteWithUploadCompletedByAnotherClient() throws Exception { public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -420,7 +433,7 @@ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception { .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); try { writer.write(buffer); @@ -437,7 +450,8 @@ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception { public void testGetCurrentUploadOffset() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -468,7 +482,7 @@ public void testGetCurrentUploadOffset() throws Exception { eq(true))) .andReturn(BLOB_INFO.toPb()); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); writer.close(); @@ -481,7 +495,8 @@ public void testGetCurrentUploadOffset() throws Exception { public void testWriteWithLastFlushRetryChunkButCompleted() throws Exception { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION)) + .andReturn(UPLOAD_ID); expect( storageRpcMock.writeWithResponse( eq(UPLOAD_ID), @@ -495,7 +510,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws Exception { expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE))) .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); - writer = newWriter(); + writer = newWriter(true); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); writer.close(); assertFalse(writer.isRetrying()); @@ -825,17 +840,23 @@ public void testSaveAndRestoreWithSignedURL() throws Exception { } private BlobWriteChannel newWriter() { - Map optionsMap = EMPTY_RPC_OPTIONS; + return newWriter(false); + } + + private BlobWriteChannel newWriter(boolean withGeneration) { + Map optionsMap = + withGeneration ? RPC_OPTIONS_GENERATION : EMPTY_RPC_OPTIONS; ResultRetryAlgorithm createResultAlgorithm = retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap); ResultRetryAlgorithm writeResultAlgorithm = retryAlgorithmManager.getForResumableUploadSessionWrite(optionsMap); + final BlobInfo blobInfo = withGeneration ? BLOB_INFO_WITH_GENERATION : BLOB_INFO; return BlobWriteChannel.newBuilder() .setStorageOptions(options) - .setBlobInfo(BLOB_INFO) + .setBlobInfo(blobInfo) .setUploadIdSupplier( ResumableMedia.startUploadForBlobInfo( - options, BLOB_INFO, optionsMap, createResultAlgorithm)) + options, blobInfo, optionsMap, createResultAlgorithm)) .setAlgorithmForWrite(writeResultAlgorithm) .build(); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java index 89e13e19e..0706df1a7 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java @@ -38,9 +38,4 @@ public static Blob blobCopyWithStorage(Blob b, Storage s) { BlobInfo.BuilderImpl builder = (BlobInfo.BuilderImpl) BlobInfo.fromPb(b.toPb()).toBuilder(); return new Blob(s, builder); } - - public static StorageOptions.Builder useDefaultStorageRetryStrategy( - StorageOptions.Builder builder) { - return builder.setStorageRetryStrategy(StorageRetryStrategy.getDefaultStorageRetryStrategy()); - } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java index 4efdda4d4..52c312b47 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java @@ -36,6 +36,7 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.Tuple; import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.spi.StorageRpcFactory; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.collect.ImmutableList; @@ -121,6 +122,8 @@ public class StorageImplMockitoTest { // Empty StorageRpc options private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); + private static final Map BLOB_INFO1_RPC_OPTIONS_WITH_GENERATION = + ImmutableMap.of(StorageRpc.Option.IF_GENERATION_MATCH, 24L); // Bucket target options private static final Storage.BucketTargetOption BUCKET_TARGET_METAGENERATION = @@ -726,7 +729,10 @@ public void testCreateBlobRetry() throws IOException { .doReturn(BLOB_INFO1.toPb()) .doThrow(UNEXPECTED_CALL_EXCEPTION) .when(storageRpcMock) - .create(Mockito.eq(storageObject), capturedStream.capture(), Mockito.eq(EMPTY_RPC_OPTIONS)); + .create( + Mockito.eq(storageObject), + capturedStream.capture(), + Mockito.eq(BLOB_INFO1_RPC_OPTIONS_WITH_GENERATION)); storage = options @@ -736,7 +742,7 @@ public void testCreateBlobRetry() throws IOException { .getService(); initializeServiceDependentObjects(); - Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT); + Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT, BlobTargetOption.generationMatch()); assertEquals(expectedBlob1, blob); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RetryTestFixture.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RetryTestFixture.java index 8be2db85b..2f0c2fbb9 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RetryTestFixture.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RetryTestFixture.java @@ -23,7 +23,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.conformance.storage.v1.InstructionList; import com.google.cloud.conformance.storage.v1.Method; -import com.google.cloud.storage.PackagePrivateMethodWorkarounds; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.conformance.retry.TestBench.RetryTestResource; @@ -141,7 +140,6 @@ private Storage newStorage(boolean forTest) { .setHost(testBench.getBaseUri()) .setCredentials(NoCredentials.getInstance()) .setProjectId(testRetryConformance.getProjectId()); - builder = PackagePrivateMethodWorkarounds.useDefaultStorageRetryStrategy(builder); RetrySettings.Builder retrySettingsBuilder = StorageOptions.getDefaultRetrySettings().toBuilder(); if (forTest) {