Skip to content

Commit

Permalink
feat: update all automatic retry behavior to be idempotency aware (#1132
Browse files Browse the repository at this point in the history
)

1. Add new configuration option `StorageOptions.Builder#setStorageRetryStrategy`
2. Add new interface `StorageRetryStrategy`
3. Update all calls to be made idempotency aware
4. Change default behavior of automatic retries to only retry those operations which are deemed to be idempotent
5. Add @deprecated method `StorageRetryStrategy.getLegacyStorageRetryStrategy` providing access to a strategy which matches the previous sometimes unsafe behavior
6. make BlobWriteChannelTest cases idempotent
7. make StorageImplMockitoTest#testCreateBlobRetry idempotent
  • Loading branch information
BenWhitehead committed Nov 2, 2021
1 parent 2d64f90 commit 470b8cd
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 38 deletions.
Expand Up @@ -87,7 +87,7 @@ public Builder setTransportOptions(TransportOptions transportOptions) {
* @return the builder
* @see StorageRetryStrategy#getDefaultStorageRetryStrategy()
*/
Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy) {
public Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy) {
this.storageRetryStrategy =
requireNonNull(storageRetryStrategy, "storageRetryStrategy must be non null");
return this;
Expand Down Expand Up @@ -125,7 +125,7 @@ public TransportOptions getDefaultTransportOptions() {
}

public StorageRetryStrategy getStorageRetryStrategy() {
return StorageRetryStrategy.getLegacyStorageRetryStrategy();
return StorageRetryStrategy.getDefaultStorageRetryStrategy();
}
}

Expand Down
Expand Up @@ -29,7 +29,7 @@
* @see #getDefaultStorageRetryStrategy()
* @see #getUniformStorageRetryStrategy()
*/
interface StorageRetryStrategy extends Serializable {
public interface StorageRetryStrategy extends Serializable {

/**
* Factory method to provide a {@link ResultRetryAlgorithm} which will be used to evaluate whether
Expand Down
Expand Up @@ -65,8 +65,12 @@ public class BlobWriteChannelTest {
private static final String BLOB_NAME = "n";
private static final String UPLOAD_ID = "uploadid";
private static final BlobInfo BLOB_INFO = BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME).build();
private static final BlobInfo BLOB_INFO_WITH_GENERATION =
BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME, 1L).build();
private static final StorageObject UPDATED_BLOB = new StorageObject();
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
private static final Map<StorageRpc.Option, ?> RPC_OPTIONS_GENERATION =
ImmutableMap.of(StorageRpc.Option.IF_GENERATION_MATCH, 1L);
private static final int MIN_CHUNK_SIZE = 256 * 1024;
private static final int DEFAULT_CHUNK_SIZE = 60 * MIN_CHUNK_SIZE; // 15MiB
private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE;
Expand Down Expand Up @@ -112,11 +116,12 @@ public void testCreate() {

@Test
public void testCreateRetryableError() {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS))
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andThrow(socketClosedException);
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
assertTrue(writer.isOpen());
assertNull(writer.getStorageObject());
}
Expand Down Expand Up @@ -146,7 +151,8 @@ public void testWriteWithoutFlush() throws Exception {
public void testWriteWithFlushRetryChunk() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -167,7 +173,7 @@ public void testWriteWithFlushRetryChunk() throws Exception {
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
Expand All @@ -179,7 +185,8 @@ public void testWriteWithFlushRetryChunk() throws Exception {
public void testWriteWithRetryFullChunk() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID), (byte[]) anyObject(), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(false)))
Expand All @@ -204,7 +211,7 @@ public void testWriteWithRetryFullChunk() throws Exception {
eq(true)))
.andReturn(BLOB_INFO.toPb());
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
Expand All @@ -217,7 +224,8 @@ public void testWriteWithRetryFullChunk() throws Exception {
public void testWriteWithRemoteProgressMade() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -239,7 +247,7 @@ public void testWriteWithRemoteProgressMade() throws Exception {
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
Expand All @@ -251,7 +259,8 @@ public void testWriteWithRemoteProgressMade() throws Exception {
public void testWriteWithDriftRetryCase4() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -272,7 +281,7 @@ public void testWriteWithDriftRetryCase4() throws Exception {
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
Expand All @@ -288,7 +297,8 @@ public void testWriteWithDriftRetryCase4() throws Exception {
public void testWriteWithUnreachableRemoteOffset() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -300,7 +310,7 @@ public void testWriteWithUnreachableRemoteOffset() throws Exception {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(MIN_CHUNK_SIZE + 10L);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
Expand All @@ -317,7 +327,8 @@ public void testWriteWithUnreachableRemoteOffset() throws Exception {
public void testWriteWithRetryAndObjectMetadata() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand Down Expand Up @@ -345,7 +356,7 @@ public void testWriteWithRetryAndObjectMetadata() throws Exception {
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
Expand All @@ -358,7 +369,8 @@ public void testWriteWithRetryAndObjectMetadata() throws Exception {
public void testWriteWithUploadCompletedByAnotherClient() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -380,7 +392,7 @@ public void testWriteWithUploadCompletedByAnotherClient() throws Exception {
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
Expand All @@ -399,7 +411,8 @@ public void testWriteWithUploadCompletedByAnotherClient() throws Exception {
public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -420,7 +433,7 @@ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
Expand All @@ -437,7 +450,8 @@ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception {
public void testGetCurrentUploadOffset() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand Down Expand Up @@ -468,7 +482,7 @@ public void testGetCurrentUploadOffset() throws Exception {
eq(true)))
.andReturn(BLOB_INFO.toPb());
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
Expand All @@ -481,7 +495,8 @@ public void testGetCurrentUploadOffset() throws Exception {
public void testWriteWithLastFlushRetryChunkButCompleted() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -495,7 +510,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws Exception {
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
assertFalse(writer.isRetrying());
Expand Down Expand Up @@ -825,17 +840,23 @@ public void testSaveAndRestoreWithSignedURL() throws Exception {
}

private BlobWriteChannel newWriter() {
Map<StorageRpc.Option, ?> optionsMap = EMPTY_RPC_OPTIONS;
return newWriter(false);
}

private BlobWriteChannel newWriter(boolean withGeneration) {
Map<StorageRpc.Option, ?> optionsMap =
withGeneration ? RPC_OPTIONS_GENERATION : EMPTY_RPC_OPTIONS;
ResultRetryAlgorithm<?> createResultAlgorithm =
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap);
ResultRetryAlgorithm<?> writeResultAlgorithm =
retryAlgorithmManager.getForResumableUploadSessionWrite(optionsMap);
final BlobInfo blobInfo = withGeneration ? BLOB_INFO_WITH_GENERATION : BLOB_INFO;
return BlobWriteChannel.newBuilder()
.setStorageOptions(options)
.setBlobInfo(BLOB_INFO)
.setBlobInfo(blobInfo)
.setUploadIdSupplier(
ResumableMedia.startUploadForBlobInfo(
options, BLOB_INFO, optionsMap, createResultAlgorithm))
options, blobInfo, optionsMap, createResultAlgorithm))
.setAlgorithmForWrite(writeResultAlgorithm)
.build();
}
Expand Down
Expand Up @@ -38,9 +38,4 @@ public static Blob blobCopyWithStorage(Blob b, Storage s) {
BlobInfo.BuilderImpl builder = (BlobInfo.BuilderImpl) BlobInfo.fromPb(b.toPb()).toBuilder();
return new Blob(s, builder);
}

public static StorageOptions.Builder useDefaultStorageRetryStrategy(
StorageOptions.Builder builder) {
return builder.setStorageRetryStrategy(StorageRetryStrategy.getDefaultStorageRetryStrategy());
}
}
Expand Up @@ -36,6 +36,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,6 +122,8 @@ public class StorageImplMockitoTest {

// Empty StorageRpc options
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
private static final Map<StorageRpc.Option, ?> BLOB_INFO1_RPC_OPTIONS_WITH_GENERATION =
ImmutableMap.of(StorageRpc.Option.IF_GENERATION_MATCH, 24L);

// Bucket target options
private static final Storage.BucketTargetOption BUCKET_TARGET_METAGENERATION =
Expand Down Expand Up @@ -726,7 +729,10 @@ public void testCreateBlobRetry() throws IOException {
.doReturn(BLOB_INFO1.toPb())
.doThrow(UNEXPECTED_CALL_EXCEPTION)
.when(storageRpcMock)
.create(Mockito.eq(storageObject), capturedStream.capture(), Mockito.eq(EMPTY_RPC_OPTIONS));
.create(
Mockito.eq(storageObject),
capturedStream.capture(),
Mockito.eq(BLOB_INFO1_RPC_OPTIONS_WITH_GENERATION));

storage =
options
Expand All @@ -736,7 +742,7 @@ public void testCreateBlobRetry() throws IOException {
.getService();
initializeServiceDependentObjects();

Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT);
Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT, BlobTargetOption.generationMatch());

assertEquals(expectedBlob1, blob);

Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.conformance.storage.v1.InstructionList;
import com.google.cloud.conformance.storage.v1.Method;
import com.google.cloud.storage.PackagePrivateMethodWorkarounds;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.conformance.retry.TestBench.RetryTestResource;
Expand Down Expand Up @@ -141,7 +140,6 @@ private Storage newStorage(boolean forTest) {
.setHost(testBench.getBaseUri())
.setCredentials(NoCredentials.getInstance())
.setProjectId(testRetryConformance.getProjectId());
builder = PackagePrivateMethodWorkarounds.useDefaultStorageRetryStrategy(builder);
RetrySettings.Builder retrySettingsBuilder =
StorageOptions.getDefaultRetrySettings().toBuilder();
if (forTest) {
Expand Down

0 comments on commit 470b8cd

Please sign in to comment.