diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 32dc3b7157..84a1e96049 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -21,6 +21,7 @@ import com.google.api.gax.longrunning.OperationSnapshot; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.NoCredentials; @@ -291,7 +292,8 @@ private Builder() { .setRetrySettings(longRunningRetrySettings); databaseAdminStubSettingsBuilder .updateBackupSettings() - .setRetrySettings(longRunningRetrySettings); + .setRetrySettings(longRunningRetrySettings) + .setRetryableCodes(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE); } Builder(SpannerOptions options) { @@ -581,6 +583,7 @@ public Builder setEmulatorHost(String emulatorHost) { return this; } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { // Set the host of emulator has been set. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index de1a09158c..2719724b83 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -27,30 +27,38 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.OperationCallable; import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.api.pathtemplate.PathTemplate; +import com.google.cloud.RetryHelper; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; +import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub; import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub; import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; import com.google.cloud.spanner.v1.stub.GrpcSpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStub; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.iam.v1.GetIamPolicyRequest; @@ -63,6 +71,9 @@ import com.google.longrunning.Operation; import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; import com.google.spanner.admin.database.v1.Backup; import com.google.spanner.admin.database.v1.CreateBackupMetadata; import com.google.spanner.admin.database.v1.CreateBackupRequest; @@ -123,9 +134,12 @@ import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -134,6 +148,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -193,6 +208,7 @@ private synchronized void shutdown() { private boolean rpcIsClosed; private final SpannerStub spannerStub; private final InstanceAdminStub instanceAdminStub; + private final DatabaseAdminStubSettings databaseAdminStubSettings; private final DatabaseAdminStub databaseAdminStub; private final String projectId; private final String projectName; @@ -321,20 +337,173 @@ public GapicSpannerRpc(final SpannerOptions options) { .setStreamWatchdogProvider(watchdogProvider) .build()); - this.databaseAdminStub = - GrpcDatabaseAdminStub.create( - options - .getDatabaseAdminStubSettings() - .toBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .setStreamWatchdogProvider(watchdogProvider) - .build()); + this.databaseAdminStubSettings = + options + .getDatabaseAdminStubSettings() + .toBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) + .build(); + this.databaseAdminStub = GrpcDatabaseAdminStub.create(this.databaseAdminStubSettings); } catch (Exception e) { throw newSpannerException(e); } } + private static final class OperationFutureRetryAlgorithm + implements ResultRetryAlgorithm> { + private static final ImmutableList RETRYABLE_CODES = + ImmutableList.of(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE); + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, + OperationFuture prevResponse, + TimedAttemptSettings prevSettings) { + // Use default retry settings. + return null; + } + + @Override + public boolean shouldRetry( + Throwable prevThrowable, OperationFuture prevResponse) + throws CancellationException { + if (prevThrowable instanceof ApiException) { + ApiException e = (ApiException) prevThrowable; + return RETRYABLE_CODES.contains(e.getStatusCode().getCode()); + } + if (prevResponse != null) { + try { + prevResponse.getInitialFuture().get(); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof ApiException) { + ApiException e = (ApiException) cause; + return RETRYABLE_CODES.contains(e.getStatusCode().getCode()); + } + } catch (InterruptedException e) { + return false; + } + } + return false; + } + } + + private final class OperationFutureCallable + implements Callable> { + final OperationCallable operationCallable; + final RequestT initialRequest; + final String instanceName; + final OperationsLister lister; + final Function getStartTimeFunction; + Timestamp initialCallTime; + boolean isRetry = false; + + OperationFutureCallable( + OperationCallable operationCallable, + RequestT initialRequest, + String instanceName, + OperationsLister lister, + Function getStartTimeFunction) { + this.operationCallable = operationCallable; + this.initialRequest = initialRequest; + this.instanceName = instanceName; + this.lister = lister; + this.getStartTimeFunction = getStartTimeFunction; + } + + @Override + public OperationFuture call() throws Exception { + acquireAdministrativeRequestsRateLimiter(); + + String operationName = null; + if (isRetry) { + // Query the backend to see if the operation was actually created, and that the + // problem was caused by a network problem or other transient problem client side. + Operation operation = mostRecentOperation(lister, getStartTimeFunction, initialCallTime); + if (operation != null) { + // Operation found, resume tracking that operation. + operationName = operation.getName(); + } + } else { + initialCallTime = + Timestamp.newBuilder() + .setSeconds( + TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)) + .build(); + } + isRetry = true; + + if (operationName == null) { + GrpcCallContext context = newCallContext(null, instanceName); + return operationCallable.futureCall(initialRequest, context); + } else { + return operationCallable.resumeFutureCall(operationName); + } + } + } + + private interface OperationsLister { + Paginated listOperations(String nextPageToken); + } + + private Operation mostRecentOperation( + OperationsLister lister, + Function getStartTimeFunction, + Timestamp initialCallTime) + throws InvalidProtocolBufferException { + Operation res = null; + Timestamp currMaxStartTime = null; + String nextPageToken = null; + Paginated operations; + do { + operations = lister.listOperations(nextPageToken); + for (Operation op : operations.getResults()) { + Timestamp startTime = getStartTimeFunction.apply(op); + if (res == null + || (TimestampComparator.INSTANCE.compare(startTime, currMaxStartTime) > 0 + && TimestampComparator.INSTANCE.compare(startTime, initialCallTime) >= 0)) { + currMaxStartTime = startTime; + res = op; + } + // If the operation does not report any start time, then the operation that is not yet done + // is the one that is the most recent. + if (startTime == null && currMaxStartTime == null && !op.getDone()) { + res = op; + break; + } + } + } while (operations.getNextPageToken() != null); + return res; + } + + private static final class TimestampComparator implements Comparator { + private static final TimestampComparator INSTANCE = new TimestampComparator(); + + @Override + public int compare(Timestamp t1, Timestamp t2) { + if (t1 == null && t2 == null) { + return 0; + } + if (t1 != null && t2 == null) { + return 1; + } + if (t1 == null && t2 != null) { + return -1; + } + if (t1.getSeconds() > t2.getSeconds() + || (t1.getSeconds() == t2.getSeconds() && t1.getNanos() > t2.getNanos())) { + return 1; + } + if (t1.getSeconds() < t2.getSeconds() + || (t1.getSeconds() == t2.getSeconds() && t1.getNanos() < t2.getNanos())) { + return -1; + } + return 0; + } + } + private void acquireAdministrativeRequestsRateLimiter() { if (throttleAdministrativeRequests) { RateLimiter limiter = ADMINISTRATIVE_REQUESTS_RATE_LIMITERS.get(this.projectName); @@ -508,17 +677,66 @@ public Paginated listDatabases( @Override public OperationFuture createDatabase( - String instanceName, String createDatabaseStatement, Iterable additionalStatements) + final String instanceName, + String createDatabaseStatement, + Iterable additionalStatements) throws SpannerException { - acquireAdministrativeRequestsRateLimiter(); + final String databaseId = + createDatabaseStatement.substring( + "CREATE DATABASE `".length(), createDatabaseStatement.length() - 1); CreateDatabaseRequest request = CreateDatabaseRequest.newBuilder() .setParent(instanceName) .setCreateStatement(createDatabaseStatement) .addAllExtraStatements(additionalStatements) .build(); - GrpcCallContext context = newCallContext(null, instanceName); - return databaseAdminStub.createDatabaseOperationCallable().futureCall(request, context); + + OperationFutureCallable callable = + new OperationFutureCallable( + databaseAdminStub.createDatabaseOperationCallable(), + request, + instanceName, + new OperationsLister() { + @Override + public Paginated listOperations(String nextPageToken) { + return listDatabaseOperations( + instanceName, + 0, + String.format( + "(name:%s/operations/) AND (metadata.@type:type.googleapis.com/%s)", + String.format("%s/databases/%s", instanceName, databaseId), + CreateDatabaseMetadata.getDescriptor().getFullName()), + nextPageToken); + } + }, + new Function() { + @Override + public Timestamp apply(Operation input) { + if (input.getDone() && input.hasResponse()) { + try { + Timestamp createTime = + input.getResponse().unpack(Database.class).getCreateTime(); + if (Timestamp.getDefaultInstance().equals(createTime)) { + // Create time was not returned by the server (proto objects never return + // null, instead they return the default instance). Return null from this + // method to indicate that there is no known create time. + return null; + } + } catch (InvalidProtocolBufferException e) { + return null; + } + } + return null; + } + }); + return RetryHelper.runWithRetries( + callable, + databaseAdminStubSettings + .createDatabaseOperationSettings() + .getInitialCallSettings() + .getRetrySettings(), + new OperationFutureRetryAlgorithm<>(), + NanoClock.getDefaultClock()); } @Override @@ -584,30 +802,106 @@ public List getDatabaseDdl(String databaseName) throws SpannerException @Override public OperationFuture createBackup( - String instanceName, String backupId, Backup backup) throws SpannerException { - acquireAdministrativeRequestsRateLimiter(); + final String instanceName, final String backupId, final Backup backup) + throws SpannerException { CreateBackupRequest request = CreateBackupRequest.newBuilder() .setParent(instanceName) .setBackupId(backupId) .setBackup(backup) .build(); - GrpcCallContext context = newCallContext(null, instanceName); - return databaseAdminStub.createBackupOperationCallable().futureCall(request, context); + OperationFutureCallable callable = + new OperationFutureCallable( + databaseAdminStub.createBackupOperationCallable(), + request, + instanceName, + new OperationsLister() { + @Override + public Paginated listOperations(String nextPageToken) { + return listBackupOperations( + instanceName, + 0, + String.format( + "(metadata.name:%s) AND (metadata.@type:type.googleapis.com/%s)", + String.format("%s/backups/%s", instanceName, backupId), + CreateBackupMetadata.getDescriptor().getFullName()), + nextPageToken); + } + }, + new Function() { + @Override + public Timestamp apply(Operation input) { + try { + return input + .getMetadata() + .unpack(CreateBackupMetadata.class) + .getProgress() + .getStartTime(); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + }); + return RetryHelper.runWithRetries( + callable, + databaseAdminStubSettings + .createBackupOperationSettings() + .getInitialCallSettings() + .getRetrySettings(), + new OperationFutureRetryAlgorithm<>(), + NanoClock.getDefaultClock()); } @Override public final OperationFuture restoreDatabase( - String databaseInstanceName, String databaseId, String backupName) { - acquireAdministrativeRequestsRateLimiter(); + final String databaseInstanceName, final String databaseId, String backupName) { RestoreDatabaseRequest request = RestoreDatabaseRequest.newBuilder() .setParent(databaseInstanceName) .setDatabaseId(databaseId) .setBackup(backupName) .build(); - GrpcCallContext context = newCallContext(null, databaseInstanceName); - return databaseAdminStub.restoreDatabaseOperationCallable().futureCall(request, context); + + OperationFutureCallable callable = + new OperationFutureCallable( + databaseAdminStub.restoreDatabaseOperationCallable(), + request, + databaseInstanceName, + new OperationsLister() { + @Override + public Paginated listOperations(String nextPageToken) { + return listDatabaseOperations( + databaseInstanceName, + 0, + String.format( + "(metadata.name:%s) AND (metadata.@type:type.googleapis.com/%s)", + String.format("%s/databases/%s", databaseInstanceName, databaseId), + RestoreDatabaseMetadata.getDescriptor().getFullName()), + nextPageToken); + } + }, + new Function() { + @Override + public Timestamp apply(Operation input) { + try { + return input + .getMetadata() + .unpack(RestoreDatabaseMetadata.class) + .getProgress() + .getStartTime(); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + }); + return RetryHelper.runWithRetries( + callable, + databaseAdminStubSettings + .restoreDatabaseOperationSettings() + .getInitialCallSettings() + .getRetrySettings(), + new OperationFutureRetryAlgorithm<>(), + NanoClock.getDefaultClock()); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java index 38bcd8437d..4d3abc10fd 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java @@ -19,29 +19,37 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertThat; -import com.google.api.gax.grpc.testing.LocalChannelProvider; -import com.google.api.gax.grpc.testing.MockGrpcService; -import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.core.ApiFunction; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.longrunning.OperationSnapshot; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.paging.Page; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.Identity; import com.google.cloud.NoCredentials; import com.google.cloud.Policy; import com.google.cloud.Role; import com.google.cloud.Timestamp; import com.google.cloud.spanner.DatabaseInfo.State; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.longrunning.Operation; import com.google.protobuf.InvalidProtocolBufferException; import com.google.spanner.admin.database.v1.CreateBackupMetadata; +import com.google.spanner.admin.database.v1.CreateBackupRequest; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import com.google.spanner.admin.database.v1.CreateDatabaseRequest; import com.google.spanner.admin.database.v1.OptimizeRestoredDatabaseMetadata; import com.google.spanner.admin.database.v1.RestoreDatabaseMetadata; +import com.google.spanner.admin.database.v1.RestoreDatabaseRequest; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -104,8 +112,9 @@ public void describeTo(Description description) { private static MockOperationsServiceImpl mockOperations; private static MockDatabaseAdminServiceImpl mockDatabaseAdmin; - private static MockServiceHelper serviceHelper; - private LocalChannelProvider channelProvider; + private static Server server; + private static InetSocketAddress address; + private Spanner spanner; private DatabaseAdminClient client; @Rule public ExpectedException exception = ExpectedException.none(); @@ -114,25 +123,48 @@ public void describeTo(Description description) { private OperationFuture restoreDatabaseOperation; @BeforeClass - public static void startStaticServer() { + public static void startStaticServer() throws Exception { mockOperations = new MockOperationsServiceImpl(); mockDatabaseAdmin = new MockDatabaseAdminServiceImpl(mockOperations); - serviceHelper = - new MockServiceHelper( - "in-process-1", Arrays.asList(mockOperations, mockDatabaseAdmin)); - serviceHelper.start(); + // This test uses a NettyServer to properly test network and timeout issues. + address = new InetSocketAddress("localhost", 0); + server = + NettyServerBuilder.forAddress(address) + .addService(mockOperations) + .addService(mockDatabaseAdmin) + .build() + .start(); } @AfterClass - public static void stopServer() { - serviceHelper.stop(); + public static void stopServer() throws Exception { + server.shutdown(); + server.awaitTermination(); } + @SuppressWarnings("rawtypes") @Before public void setUp() throws IOException { - serviceHelper.reset(); - channelProvider = serviceHelper.createChannelProvider(); + mockDatabaseAdmin.reset(); + mockOperations.reset(); SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + RetrySettings longRunningInitialRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(60L)) + .setMaxRpcTimeout(Duration.ofMillis(600L)) + .setInitialRetryDelay(Duration.ofMillis(20L)) + .setMaxRetryDelay(Duration.ofMillis(45L)) + .setRetryDelayMultiplier(1.5) + .setRpcTimeoutMultiplier(1.5) + .setTotalTimeout(Duration.ofMinutes(48L)) + .build(); + builder + .getDatabaseAdminStubSettingsBuilder() + .createBackupOperationSettings() + .setInitialCallSettings( + UnaryCallSettings.newUnaryCallSettingsBuilder() + .setRetrySettings(longRunningInitialRetrySettings) + .build()); builder .getDatabaseAdminStubSettingsBuilder() .createBackupOperationSettings() @@ -148,6 +180,15 @@ public void setUp() throws IOException { .setRetryDelayMultiplier(1.3) .setRpcTimeoutMultiplier(1.3) .build())); + + builder + .getDatabaseAdminStubSettingsBuilder() + .createDatabaseOperationSettings() + .setInitialCallSettings( + UnaryCallSettings + .newUnaryCallSettingsBuilder() + .setRetrySettings(longRunningInitialRetrySettings) + .build()); builder .getDatabaseAdminStubSettingsBuilder() .createDatabaseOperationSettings() @@ -163,6 +204,14 @@ public void setUp() throws IOException { .setRetryDelayMultiplier(1.3) .setRpcTimeoutMultiplier(1.3) .build())); + builder + .getDatabaseAdminStubSettingsBuilder() + .restoreDatabaseOperationSettings() + .setInitialCallSettings( + UnaryCallSettings + .newUnaryCallSettingsBuilder() + .setRetrySettings(longRunningInitialRetrySettings) + .build()); builder .getDatabaseAdminStubSettingsBuilder() .restoreDatabaseOperationSettings() @@ -180,7 +229,14 @@ public void setUp() throws IOException { .build())); spanner = builder - .setChannelProvider(channelProvider) + .setHost("http://localhost:" + server.getPort()) + .setChannelConfigurator( + new ApiFunction() { + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder input) { + return input.usePlaintext(); + } + }) .setCredentials(NoCredentials.getInstance()) .setProjectId(PROJECT_ID) .build() @@ -193,7 +249,9 @@ public void setUp() throws IOException { @After public void tearDown() throws Exception { - serviceHelper.reset(); + mockDatabaseAdmin.reset(); + mockDatabaseAdmin.removeAllExecutionTimes(); + mockOperations.reset(); spanner.close(); } @@ -762,4 +820,114 @@ private void restoreTestBackup() { throw SpannerExceptionFactory.newSpannerException(e); } } + + @Test + public void retryCreateBackupSlowResponse() throws Exception { + // Throw a DEADLINE_EXCEEDED after the operation has been created. This should cause the retry + // to pick up the existing operation. + mockDatabaseAdmin.setCreateBackupResponseExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String backupId = "other-backup-id"; + OperationFuture op = + client.createBackup(INSTANCE_ID, backupId, DB_ID, after7Days()); + Backup backup = op.get(); + assertThat(backup.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/backups/%s", PROJECT_ID, INSTANCE_ID, backupId)); + assertThat(client.getBackup(INSTANCE_ID, backupId)).isEqualTo(backup); + // There should be at exactly 2 requests. One from this test case and one from the setup of the + // test which also creates a test backup. + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateBackupRequest.class)).isEqualTo(2); + } + + @Test + public void retryCreateBackupSlowStartup() throws Exception { + mockDatabaseAdmin.setCreateBackupStartupExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String backupId = "other-backup-id"; + OperationFuture op = + client.createBackup(INSTANCE_ID, backupId, DB_ID, after7Days()); + Backup backup = op.get(); + assertThat(backup.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/backups/%s", PROJECT_ID, INSTANCE_ID, backupId)); + assertThat(client.getBackup(INSTANCE_ID, backupId)).isEqualTo(backup); + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateBackupRequest.class)).isAtLeast(3); + } + + @Test + public void retryCreateDatabaseSlowResponse() throws Exception { + // Throw a DEADLINE_EXCEEDED after the operation has been created. This should cause the retry + // to pick up the existing operation. + mockDatabaseAdmin.setCreateDatabaseResponseExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.createDatabase(INSTANCE_ID, databaseId, Collections.emptyList()); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + assertThat(client.getDatabase(INSTANCE_ID, databaseId)).isEqualTo(database); + // There should be at exactly 2 requests. One from this test case and one from the setup of the + // test which also creates a test database. + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateDatabaseRequest.class)).isEqualTo(2); + } + + @Test + public void retryCreateDatabaseSlowStartup() throws Exception { + mockDatabaseAdmin.setCreateDatabaseStartupExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.createDatabase(INSTANCE_ID, databaseId, Collections.emptyList()); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + assertThat(client.getDatabase(INSTANCE_ID, databaseId)).isEqualTo(database); + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateDatabaseRequest.class)).isAtLeast(3); + } + + @Test + public void retryRestoreDatabaseSlowResponse() throws Exception { + // Throw a DEADLINE_EXCEEDED after the operation has been created. This should cause the retry + // to pick up the existing operation. + mockDatabaseAdmin.setRestoreDatabaseResponseExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.restoreDatabase(INSTANCE_ID, BCK_ID, INSTANCE_ID, databaseId); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + Database retrieved = client.getDatabase(INSTANCE_ID, databaseId); + assertThat(retrieved.getCreateTime()).isEqualTo(database.getCreateTime()); + // There should be exactly 2 requests. One from this test case and one from the setup of the + // test which also restores a test database. + assertThat(mockDatabaseAdmin.countRequestsOfType(RestoreDatabaseRequest.class)).isEqualTo(2); + } + + @Test + public void retryRestoreDatabaseSlowStartup() throws Exception { + mockDatabaseAdmin.setRestoreDatabaseStartupExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.restoreDatabase(INSTANCE_ID, BCK_ID, INSTANCE_ID, databaseId); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + Database retrieved = client.getDatabase(INSTANCE_ID, databaseId); + assertThat(retrieved.getCreateTime()).isEqualTo(database.getCreateTime()); + assertThat(mockDatabaseAdmin.countRequestsOfType(RestoreDatabaseRequest.class)).isAtLeast(3); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java index d24d136448..f635a0757a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java @@ -16,8 +16,6 @@ package com.google.cloud.spanner; -import static org.junit.Assert.fail; - import com.google.api.core.ApiFunction; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.longrunning.OperationFuture; @@ -32,7 +30,6 @@ import com.google.protobuf.AbstractMessage; import com.google.protobuf.Any; import com.google.protobuf.Empty; -import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import com.google.spanner.admin.database.v1.DatabaseName; import com.google.spanner.admin.database.v1.ListDatabasesRequest; import com.google.spanner.admin.database.v1.ListDatabasesResponse; @@ -381,59 +378,6 @@ public void getDatabaseTest() { Assert.assertEquals(2, actualRequests.size()); } - @Test - public void createDatabaseTest() throws Exception { - Exception exception = setupException(); - DatabaseName name = DatabaseName.of(PROJECT, INSTANCE, "DATABASE"); - com.google.spanner.admin.database.v1.Database expectedResponse = - com.google.spanner.admin.database.v1.Database.newBuilder().setName(name.toString()).build(); - com.google.longrunning.Operation resultOperation = - com.google.longrunning.Operation.newBuilder() - .setName("createDatabaseTest") - .setDone(true) - .setResponse(Any.pack(expectedResponse)) - .build(); - if (exceptionAtCall == 0) { - mockDatabaseAdmin.addException(exception); - } - mockDatabaseAdmin.addResponse(resultOperation); - if (exceptionAtCall == 1) { - mockDatabaseAdmin.addException(exception); - } - mockDatabaseAdmin.addResponse(resultOperation); - - boolean methodIsIdempotent = - !spanner - .getOptions() - .getDatabaseAdminStubSettings() - .createDatabaseOperationSettings() - .getInitialCallSettings() - .getRetryableCodes() - .isEmpty(); - for (int i = 0; i < 2; i++) { - OperationFuture actualResponse = - client.createDatabase(INSTANCE, "DATABASE", Arrays.asList()); - try { - Database returnedInstance = actualResponse.get(); - if (!methodIsIdempotent && i == exceptionAtCall) { - fail("missing expected exception"); - } - Assert.assertEquals(name.toString(), returnedInstance.getId().getName()); - } catch (ExecutionException e) { - if (!exceptionType.isRetryable() || methodIsIdempotent || i != exceptionAtCall) { - Throwables.throwIfUnchecked(e.getCause()); - throw e; - } - } - } - List actualRequests = mockDatabaseAdmin.getRequests(); - if (methodIsIdempotent) { - Assert.assertEquals(2, actualRequests.size()); - } else { - Assert.assertEquals(1, actualRequests.size()); - } - } - @Test public void updateDatabaseDdlTest() throws Exception { Exception exception = setupException(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java index c198bc4f56..67a180e4f9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java @@ -17,7 +17,10 @@ package com.google.cloud.spanner; import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.common.base.Predicate; import com.google.common.base.Strings; +import com.google.common.collect.Collections2; import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.Policy; import com.google.iam.v1.SetIamPolicyRequest; @@ -66,7 +69,6 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; @@ -77,6 +79,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class MockDatabaseAdminServiceImpl extends DatabaseAdminImplBase implements MockGrpcService { private static final class MockDatabase { @@ -253,9 +259,9 @@ public Backup call() throws Exception { Operation operation = operations.get(operationName); for (int progress = 1; progress <= 100; progress++) { operation = operations.get(operationName); - long sleep = createBackupExecutionTime / 100; + long sleep = createBackupOperationExecutionTime / 100; if (progress == 100) { - sleep += createBackupExecutionTime % 100; + sleep += createBackupOperationExecutionTime % 100; } Thread.sleep(sleep); if (operation != null) { @@ -318,9 +324,9 @@ public Database call() throws Exception { Database proto = db.toProto(); Operation operation = operations.get(operationName); for (int progress = 1; progress <= 100; progress++) { - long sleep = restoreDatabaseExecutionTime / 100; + long sleep = restoreDatabaseOperationExecutionTime / 100; if (progress == 100) { - sleep += restoreDatabaseExecutionTime % 100; + sleep += restoreDatabaseOperationExecutionTime % 100; } Thread.sleep(sleep); if (operation != null) { @@ -391,7 +397,7 @@ public Database call() throws Exception { Thread.sleep(10L); restoreOperation = operations.get(restoreOperationName); } - Thread.sleep(optimizeDatabaseExecutionTime); + Thread.sleep(optimizeDatabaseOperationExecutionTime); db.state = State.READY; Database proto = db.toProto(); if (operation != null) { @@ -423,18 +429,30 @@ private com.google.rpc.Status fromException(Exception e) { return com.google.rpc.Status.newBuilder().setCode(code).setMessage(e.getMessage()).build(); } + private final Queue requests = new ConcurrentLinkedQueue<>(); private ConcurrentMap policies = new ConcurrentHashMap<>(); private static final String EXPIRE_TIME_MASK = "expire_time"; private static final Random RND = new Random(); private final Queue exceptions = new ConcurrentLinkedQueue<>(); + private final ReadWriteLock freezeLock = new ReentrantReadWriteLock(); private final ConcurrentMap databases = new ConcurrentHashMap<>(); private final ConcurrentMap backups = new ConcurrentHashMap<>(); private final ConcurrentMap> filterMatches = new ConcurrentHashMap<>(); private final MockOperationsServiceImpl operations; - private long createBackupExecutionTime; - private long restoreDatabaseExecutionTime; - private long optimizeDatabaseExecutionTime; + private long createBackupOperationExecutionTime; + private long restoreDatabaseOperationExecutionTime; + private long optimizeDatabaseOperationExecutionTime; + + private SimulatedExecutionTime createBackupStartupExecutionTime = SimulatedExecutionTime.none(); + private SimulatedExecutionTime createBackupResponseExecutionTime = SimulatedExecutionTime.none(); + private SimulatedExecutionTime createDatabaseStartupExecutionTime = SimulatedExecutionTime.none(); + private SimulatedExecutionTime createDatabaseResponseExecutionTime = + SimulatedExecutionTime.none(); + private SimulatedExecutionTime restoreDatabaseStartupExecutionTime = + SimulatedExecutionTime.none(); + private SimulatedExecutionTime restoreDatabaseResponseExecutionTime = + SimulatedExecutionTime.none(); public MockDatabaseAdminServiceImpl(MockOperationsServiceImpl operations) { this.operations = operations; @@ -443,36 +461,44 @@ public MockDatabaseAdminServiceImpl(MockOperationsServiceImpl operations) { @Override public void createDatabase( CreateDatabaseRequest request, StreamObserver responseObserver) { - String id = request.getCreateStatement().replace("CREATE DATABASE ", ""); - if (id.startsWith("`") && id.endsWith("`")) { - id = id.substring(1, id.length() - 1); - } - String name = String.format("%s/databases/%s", request.getParent(), id); - MockDatabase db = new MockDatabase(name, request.getExtraStatementsList(), null); - if (databases.putIfAbsent(name, db) == null) { - CreateDatabaseMetadata metadata = - CreateDatabaseMetadata.newBuilder().setDatabase(name).build(); - Database database = Database.newBuilder().setName(name).setState(db.state).build(); - Operation operation = - Operation.newBuilder() - .setMetadata(Any.pack(metadata)) - .setResponse(Any.pack(database)) - .setDone(false) - .setName(operations.generateOperationName(name)) - .build(); - operations.addOperation(operation, new CreateDatabaseCallable(operation.getName(), name)); - responseObserver.onNext(operation); - responseObserver.onCompleted(); - } else { - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription(String.format("Database with name %s already exists", name)) - .asRuntimeException()); + requests.add(request); + try { + createDatabaseStartupExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + String id = request.getCreateStatement().replace("CREATE DATABASE ", ""); + if (id.startsWith("`") && id.endsWith("`")) { + id = id.substring(1, id.length() - 1); + } + String name = String.format("%s/databases/%s", request.getParent(), id); + MockDatabase db = new MockDatabase(name, request.getExtraStatementsList(), null); + if (databases.putIfAbsent(name, db) == null) { + CreateDatabaseMetadata metadata = + CreateDatabaseMetadata.newBuilder().setDatabase(name).build(); + Database database = Database.newBuilder().setName(name).setState(db.state).build(); + Operation operation = + Operation.newBuilder() + .setMetadata(Any.pack(metadata)) + .setResponse(Any.pack(database)) + .setDone(false) + .setName(operations.generateOperationName(name)) + .build(); + operations.addOperation(operation, new CreateDatabaseCallable(operation.getName(), name)); + createDatabaseResponseExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + responseObserver.onNext(operation); + responseObserver.onCompleted(); + } else { + responseObserver.onError( + Status.ALREADY_EXISTS + .withDescription(String.format("Database with name %s already exists", name)) + .asRuntimeException()); + } + } catch (Throwable t) { + responseObserver.onError(t); } } @Override public void dropDatabase(DropDatabaseRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getDatabase()); if (databases.remove(request.getDatabase(), db)) { responseObserver.onNext(Empty.getDefaultInstance()); @@ -484,6 +510,7 @@ public void dropDatabase(DropDatabaseRequest request, StreamObserver resp @Override public void getDatabase(GetDatabaseRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getName()); if (db != null) { responseObserver.onNext( @@ -501,6 +528,7 @@ public void getDatabase(GetDatabaseRequest request, StreamObserver res @Override public void getDatabaseDdl( GetDatabaseDdlRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getDatabase()); if (db != null) { responseObserver.onNext(GetDatabaseDdlResponse.newBuilder().addAllStatements(db.ddl).build()); @@ -513,6 +541,7 @@ public void getDatabaseDdl( @Override public void listDatabases( ListDatabasesRequest request, StreamObserver responseObserver) { + requests.add(request); List dbs = new ArrayList<>(databases.size()); for (Entry entry : databases.entrySet()) { dbs.add( @@ -530,6 +559,7 @@ public void listDatabases( public void listDatabaseOperations( ListDatabaseOperationsRequest request, StreamObserver responseObserver) { + requests.add(request); ListDatabaseOperationsResponse.Builder builder = ListDatabaseOperationsResponse.newBuilder(); try { for (Operation op : operations.iterable()) { @@ -554,6 +584,36 @@ private boolean matchesFilter(Object obj, String filter) throws Exception { String name = (String) obj.getClass().getMethod("getName").invoke(obj); return matches.contains(name); } + if (obj instanceof Operation) { + Operation operation = (Operation) obj; + Pattern pattern = + Pattern.compile( + "(?:\\(metadata.(?:name|database):(.*)\\)|\\(name:(.*)/operations/\\)) AND \\(metadata.@type:type.googleapis.com/(.*)\\)"); + Matcher matcher = pattern.matcher(filter); + if (matcher.matches()) { + String objectName = matcher.group(1); + if (objectName == null) { + objectName = matcher.group(2); + } + String type = matcher.group(3); + Any anyMetadata = operation.getMetadata(); + if (anyMetadata.getTypeUrl().endsWith(type)) { + if (type.equals(CreateBackupMetadata.getDescriptor().getFullName())) { + CreateBackupMetadata metadata = + operation.getMetadata().unpack(CreateBackupMetadata.class); + return metadata.getName().equals(objectName); + } else if (type.equals(CreateDatabaseMetadata.getDescriptor().getFullName())) { + CreateDatabaseMetadata metadata = + operation.getMetadata().unpack(CreateDatabaseMetadata.class); + return metadata.getDatabase().equals(objectName); + } else if (type.equals(RestoreDatabaseMetadata.getDescriptor().getFullName())) { + RestoreDatabaseMetadata metadata = + operation.getMetadata().unpack(RestoreDatabaseMetadata.class); + return metadata.getName().equals(objectName); + } + } + } + } return false; } return true; @@ -562,6 +622,7 @@ private boolean matchesFilter(Object obj, String filter) throws Exception { @Override public void updateDatabaseDdl( UpdateDatabaseDdlRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getDatabase()); if (db != null) { db.ddl.addAll(request.getStatementsList()); @@ -588,50 +649,59 @@ public void updateDatabaseDdl( @Override public void createBackup( CreateBackupRequest request, StreamObserver responseObserver) { - String name = String.format("%s/backups/%s", request.getParent(), request.getBackupId()); - MockDatabase db = databases.get(request.getBackup().getDatabase()); - if (db == null) { - responseObserver.onError( - Status.NOT_FOUND - .withDescription( - String.format( - "Database with name %s not found", request.getBackup().getDatabase())) - .asRuntimeException()); - return; - } - MockBackup bck = new MockBackup(name, request.getBackup(), db); - if (backups.putIfAbsent(name, bck) == null) { - CreateBackupMetadata metadata = - CreateBackupMetadata.newBuilder() - .setName(name) - .setDatabase(bck.database) - .setProgress( - OperationProgress.newBuilder() - .setStartTime( - Timestamp.newBuilder() - .setSeconds(System.currentTimeMillis() / 1000L) - .build()) - .setProgressPercent(0)) - .build(); - Operation operation = - Operation.newBuilder() - .setMetadata(Any.pack(metadata)) - .setResponse(Any.pack(bck.toProto())) - .setName(operations.generateOperationName(name)) - .build(); - operations.addOperation(operation, new CreateBackupCallable(operation.getName(), name)); - responseObserver.onNext(operation); - responseObserver.onCompleted(); - } else { - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription(String.format("Backup with name %s already exists", name)) - .asRuntimeException()); + requests.add(request); + try { + createBackupStartupExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + String name = String.format("%s/backups/%s", request.getParent(), request.getBackupId()); + MockDatabase db = databases.get(request.getBackup().getDatabase()); + if (db == null) { + responseObserver.onError( + Status.NOT_FOUND + .withDescription( + String.format( + "Database with name %s not found", request.getBackup().getDatabase())) + .asRuntimeException()); + return; + } + MockBackup bck = new MockBackup(name, request.getBackup(), db); + if (backups.putIfAbsent(name, bck) == null) { + CreateBackupMetadata metadata = + CreateBackupMetadata.newBuilder() + .setName(name) + .setDatabase(bck.database) + .setProgress( + OperationProgress.newBuilder() + .setStartTime( + Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() / 1000L) + .build()) + .setProgressPercent(0)) + .build(); + Operation operation = + Operation.newBuilder() + .setMetadata(Any.pack(metadata)) + .setResponse(Any.pack(bck.toProto())) + .setName(operations.generateOperationName(name)) + .build(); + operations.addOperation(operation, new CreateBackupCallable(operation.getName(), name)); + + createBackupResponseExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + responseObserver.onNext(operation); + responseObserver.onCompleted(); + } else { + responseObserver.onError( + Status.ALREADY_EXISTS + .withDescription(String.format("Backup with name %s already exists", name)) + .asRuntimeException()); + } + } catch (Throwable t) { + responseObserver.onError(t); } } @Override public void deleteBackup(DeleteBackupRequest request, StreamObserver responseObserver) { + requests.add(request); MockBackup bck = backups.get(request.getName()); if (backups.remove(request.getName(), bck)) { responseObserver.onNext(Empty.getDefaultInstance()); @@ -643,6 +713,7 @@ public void deleteBackup(DeleteBackupRequest request, StreamObserver resp @Override public void getBackup(GetBackupRequest request, StreamObserver responseObserver) { + requests.add(request); MockBackup bck = backups.get(request.getName()); if (bck != null) { responseObserver.onNext( @@ -663,6 +734,7 @@ public void getBackup(GetBackupRequest request, StreamObserver responseO @Override public void listBackups( ListBackupsRequest request, StreamObserver responseObserver) { + requests.add(request); List bcks = new ArrayList<>(backups.size()); try { for (Entry entry : backups.entrySet()) { @@ -689,6 +761,7 @@ public void listBackups( public void listBackupOperations( ListBackupOperationsRequest request, StreamObserver responseObserver) { + requests.add(request); ListBackupOperationsResponse.Builder builder = ListBackupOperationsResponse.newBuilder(); try { for (Operation op : operations.iterable()) { @@ -708,6 +781,7 @@ public void listBackupOperations( @Override public void updateBackup(UpdateBackupRequest request, StreamObserver responseObserver) { + requests.add(request); MockBackup bck = backups.get(request.getBackup().getName()); if (bck != null) { if (request.getUpdateMask().getPathsList().contains(EXPIRE_TIME_MASK)) { @@ -731,70 +805,80 @@ public void updateBackup(UpdateBackupRequest request, StreamObserver res @Override public void restoreDatabase( RestoreDatabaseRequest request, StreamObserver responseObserver) { - MockBackup bck = backups.get(request.getBackup()); - if (bck != null) { - String name = String.format("%s/databases/%s", request.getParent(), request.getDatabaseId()); - MockDatabase db = - new MockDatabase( - name, - bck.ddl, - RestoreInfo.newBuilder() + requests.add(request); + try { + restoreDatabaseStartupExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + MockBackup bck = backups.get(request.getBackup()); + if (bck != null) { + String name = + String.format("%s/databases/%s", request.getParent(), request.getDatabaseId()); + MockDatabase db = + new MockDatabase( + name, + bck.ddl, + RestoreInfo.newBuilder() + .setBackupInfo(bck.toBackupInfo()) + .setSourceType(RestoreSourceType.BACKUP) + .build()); + if (databases.putIfAbsent(name, db) == null) { + bck.referencingDatabases.add(db.name); + Operation optimizeOperation = + Operation.newBuilder() + .setDone(false) + .setName(operations.generateOperationName(name)) + .setMetadata( + Any.pack( + OptimizeRestoredDatabaseMetadata.newBuilder() + .setName(name) + .setProgress( + OperationProgress.newBuilder() + .setStartTime(currentTime()) + .setProgressPercent(0) + .build()) + .build())) + .setResponse(Any.pack(db.toProto())) + .build(); + RestoreDatabaseMetadata metadata = + RestoreDatabaseMetadata.newBuilder() .setBackupInfo(bck.toBackupInfo()) + .setName(name) + .setProgress( + OperationProgress.newBuilder() + .setStartTime(currentTime()) + .setProgressPercent(0) + .build()) + .setOptimizeDatabaseOperationName(optimizeOperation.getName()) .setSourceType(RestoreSourceType.BACKUP) - .build()); - if (databases.putIfAbsent(name, db) == null) { - bck.referencingDatabases.add(db.name); - Operation optimizeOperation = - Operation.newBuilder() - .setDone(false) - .setName(operations.generateOperationName(name)) - .setMetadata( - Any.pack( - OptimizeRestoredDatabaseMetadata.newBuilder() - .setName(name) - .setProgress( - OperationProgress.newBuilder() - .setStartTime(currentTime()) - .setProgressPercent(0) - .build()) - .build())) - .setResponse(Any.pack(db.toProto())) - .build(); - RestoreDatabaseMetadata metadata = - RestoreDatabaseMetadata.newBuilder() - .setBackupInfo(bck.toBackupInfo()) - .setName(name) - .setProgress( - OperationProgress.newBuilder() - .setStartTime(currentTime()) - .setProgressPercent(0) - .build()) - .setOptimizeDatabaseOperationName(optimizeOperation.getName()) - .setSourceType(RestoreSourceType.BACKUP) - .build(); - Operation operation = - Operation.newBuilder() - .setMetadata(Any.pack(metadata)) - .setResponse(Any.pack(db.toProto())) - .setDone(false) - .setName(operations.generateOperationName(name)) - .build(); - operations.addOperation(operation, new RestoreDatabaseCallable(operation.getName(), name)); - operations.addOperation( - optimizeOperation, - new OptimizeDatabaseCallable(optimizeOperation.getName(), operation.getName(), name)); - responseObserver.onNext(operation); - responseObserver.onCompleted(); + .build(); + Operation operation = + Operation.newBuilder() + .setMetadata(Any.pack(metadata)) + .setResponse(Any.pack(db.toProto())) + .setDone(false) + .setName(operations.generateOperationName(name)) + .build(); + operations.addOperation( + operation, new RestoreDatabaseCallable(operation.getName(), name)); + operations.addOperation( + optimizeOperation, + new OptimizeDatabaseCallable(optimizeOperation.getName(), operation.getName(), name)); + restoreDatabaseResponseExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + responseObserver.onNext(operation); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException()); + } } else { - responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException()); + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); } - } else { - responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); + } catch (Throwable t) { + responseObserver.onError(t); } } @Override public void getIamPolicy(GetIamPolicyRequest request, StreamObserver responseObserver) { + requests.add(request); Policy policy = policies.get(request.getResource()); if (policy != null) { responseObserver.onNext(policy); @@ -806,6 +890,7 @@ public void getIamPolicy(GetIamPolicyRequest request, StreamObserver res @Override public void setIamPolicy(SetIamPolicyRequest request, StreamObserver responseObserver) { + requests.add(request); policies.put(request.getResource(), request.getPolicy()); responseObserver.onNext(request.getPolicy()); responseObserver.onCompleted(); @@ -815,6 +900,7 @@ public void setIamPolicy(SetIamPolicyRequest request, StreamObserver res public void testIamPermissions( TestIamPermissionsRequest request, StreamObserver responseObserver) { + requests.add(request); // Just return the same permissions as in the request, as we don't have any credentials. responseObserver.onNext( TestIamPermissionsResponse.newBuilder() @@ -825,7 +911,19 @@ public void testIamPermissions( @Override public List getRequests() { - return Collections.emptyList(); + return new ArrayList<>(requests); + } + + public int countRequestsOfType(final Class type) { + return Collections2.filter( + getRequests(), + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input.getClass().equals(type); + } + }) + .size(); } @Override @@ -858,6 +956,7 @@ public ServerServiceDefinition getServiceDefinition() { @Override public void reset() { + requests.clear(); exceptions.clear(); policies.clear(); databases.clear(); @@ -865,7 +964,42 @@ public void reset() { filterMatches.clear(); } + public void removeAllExecutionTimes() { + createBackupStartupExecutionTime = SimulatedExecutionTime.none(); + createBackupResponseExecutionTime = SimulatedExecutionTime.none(); + createBackupOperationExecutionTime = 0L; + createDatabaseStartupExecutionTime = SimulatedExecutionTime.none(); + createDatabaseResponseExecutionTime = SimulatedExecutionTime.none(); + restoreDatabaseStartupExecutionTime = SimulatedExecutionTime.none(); + restoreDatabaseResponseExecutionTime = SimulatedExecutionTime.none(); + restoreDatabaseOperationExecutionTime = 0L; + } + private Timestamp currentTime() { return Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000L).build(); } + + public void setCreateBackupStartupExecutionTime(SimulatedExecutionTime exec) { + this.createBackupStartupExecutionTime = exec; + } + + public void setCreateBackupResponseExecutionTime(SimulatedExecutionTime exec) { + this.createBackupResponseExecutionTime = exec; + } + + public void setCreateDatabaseStartupExecutionTime(SimulatedExecutionTime exec) { + this.createDatabaseStartupExecutionTime = exec; + } + + public void setCreateDatabaseResponseExecutionTime(SimulatedExecutionTime exec) { + this.createDatabaseResponseExecutionTime = exec; + } + + public void setRestoreDatabaseStartupExecutionTime(SimulatedExecutionTime exec) { + this.restoreDatabaseStartupExecutionTime = exec; + } + + public void setRestoreDatabaseResponseExecutionTime(SimulatedExecutionTime exec) { + this.restoreDatabaseResponseExecutionTime = exec; + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index e3d002948e..069242d5a7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -418,7 +418,7 @@ private SimulatedExecutionTime( this.stickyException = stickyException; } - private void simulateExecutionTime( + void simulateExecutionTime( Queue globalExceptions, boolean stickyGlobalExceptions, ReadWriteLock freezeLock) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java index 574d9c9b15..2755123ea0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java @@ -19,22 +19,46 @@ import static com.google.cloud.spanner.SpannerMatchers.isSpannerException; import static com.google.common.truth.Truth.assertThat; +import com.google.api.gax.grpc.GrpcInterceptorProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Backup; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.testing.RemoteSpannerHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.spanner.admin.database.v1.CreateBackupMetadata; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import com.google.spanner.admin.database.v1.RestoreDatabaseMetadata; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientCall.Listener; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -173,4 +197,214 @@ public void listPagination() throws Exception { } assertThat(dbIdsGot).containsAtLeastElementsIn(dbIds); } + + private static final class InjectErrorInterceptorProvider implements GrpcInterceptorProvider { + final AtomicBoolean injectError = new AtomicBoolean(true); + final AtomicInteger getOperationCount = new AtomicInteger(); + final AtomicInteger methodCount = new AtomicInteger(); + final String methodName; + + private InjectErrorInterceptorProvider(String methodName) { + this.methodName = methodName; + } + + @Override + public List getInterceptors() { + ClientInterceptor interceptor = + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (method.getFullMethodName().contains("GetOperation")) { + getOperationCount.incrementAndGet(); + } + if (!method.getFullMethodName().contains(methodName)) { + return next.newCall(method, callOptions); + } + + methodCount.incrementAndGet(); + final AtomicBoolean errorInjected = new AtomicBoolean(); + final ClientCall clientCall = next.newCall(method, callOptions); + + return new SimpleForwardingClientCall(clientCall) { + @Override + public void start(Listener responseListener, Metadata headers) { + super.start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + if (injectError.getAndSet(false)) { + errorInjected.set(true); + clientCall.cancel("Cancelling call for injected error", null); + } else { + super.onMessage(message); + } + } + + @Override + public void onClose(Status status, Metadata metadata) { + if (errorInjected.get()) { + status = Status.UNAVAILABLE.augmentDescription("INJECTED BY TEST"); + } + super.onClose(status, metadata); + } + }, + headers); + } + }; + } + }; + return Collections.singletonList(interceptor); + } + } + + @Test + public void testRetryNonIdempotentRpcsReturningLongRunningOperations() throws Exception { + // RPCs that return a long-running operation such as CreateDatabase, CreateBackup and + // RestoreDatabase are non-idempotent and can normally not be automatically retried in case of a + // transient failure. The client library will however automatically query the backend to check + // whether the corresponding operation was started or not, and if it was, it will pick up the + // existing operation. If no operation is found, a new RPC call will be executed to start the + // operation. + + List databases = new ArrayList<>(); + List backups = new ArrayList<>(); + String initialDatabaseId; + Timestamp initialDbCreateTime; + + try { + // CreateDatabase + InjectErrorInterceptorProvider createDbInterceptor = + new InjectErrorInterceptorProvider("CreateDatabase"); + SpannerOptions options = + testHelper.getOptions().toBuilder().setInterceptorProvider(createDbInterceptor).build(); + try (Spanner spanner = options.getService()) { + initialDatabaseId = testHelper.getUniqueDatabaseId(); + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + OperationFuture op = + client.createDatabase( + testHelper.getInstanceId().getInstance(), + initialDatabaseId, + Collections.emptyList()); + databases.add(op.get()); + // Keep track of the original create time of this database, as we will drop this database + // later and create another one with the exact same name. That means that the ListOperations + // call will return at least two CreateDatabase operations. The retry logic should always + // pick the last one. + initialDbCreateTime = op.get().getCreateTime(); + // Assert that the CreateDatabase RPC was called only once, and that the operation tracking + // was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + } + + // CreateBackup + InjectErrorInterceptorProvider createBackupInterceptor = + new InjectErrorInterceptorProvider("CreateBackup"); + options = + testHelper + .getOptions() + .toBuilder() + .setInterceptorProvider(createBackupInterceptor) + .build(); + try (Spanner spanner = options.getService()) { + String databaseId = databases.get(0).getId().getDatabase(); + String backupId = String.format("test-bck-%08d", new Random().nextInt(100000000)); + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + OperationFuture op = + client.createBackup( + testHelper.getInstanceId().getInstance(), + backupId, + databaseId, + Timestamp.ofTimeSecondsAndNanos( + Timestamp.now().getSeconds() + TimeUnit.SECONDS.convert(7L, TimeUnit.DAYS), 0)); + backups.add(op.get()); + // Assert that the CreateBackup RPC was called only once, and that the operation tracking + // was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + } + + // RestoreBackup + int attempts = 0; + while (true) { + InjectErrorInterceptorProvider restoreBackupInterceptor = + new InjectErrorInterceptorProvider("RestoreBackup"); + options = + testHelper + .getOptions() + .toBuilder() + .setInterceptorProvider(restoreBackupInterceptor) + .build(); + try (Spanner spanner = options.getService()) { + String backupId = backups.get(0).getId().getBackup(); + String restoredDbId = testHelper.getUniqueDatabaseId(); + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + OperationFuture op = + client.restoreDatabase( + testHelper.getInstanceId().getInstance(), + backupId, + testHelper.getInstanceId().getInstance(), + restoredDbId); + databases.add(op.get()); + // Assert that the RestoreDatabase RPC was called only once, and that the operation + // tracking was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + break; + } catch (ExecutionException e) { + if (e.getCause() instanceof SpannerException + && ((SpannerException) e.getCause()).getErrorCode() == ErrorCode.FAILED_PRECONDITION + && e.getCause() + .getMessage() + .contains("Please retry the operation once the pending restores complete")) { + attempts++; + if (attempts == 10) { + // Still same error after 10 attempts. Ignore. + break; + } + // wait and then retry. + Thread.sleep(60_000L); + } else { + throw e; + } + } + } + + // Create another database with the exact same name as the first database. + createDbInterceptor = new InjectErrorInterceptorProvider("CreateDatabase"); + options = + testHelper.getOptions().toBuilder().setInterceptorProvider(createDbInterceptor).build(); + try (Spanner spanner = options.getService()) { + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + // First drop the initial database. + client.dropDatabase(testHelper.getInstanceId().getInstance(), initialDatabaseId); + // Now re-create a database with the exact same name. + OperationFuture op = + client.createDatabase( + testHelper.getInstanceId().getInstance(), + initialDatabaseId, + Collections.emptyList()); + // Check that the second database was created and has a greater creation time than the + // first. + Timestamp secondCreationTime = op.get().getCreateTime(); + // TODO: Change this to greaterThan when the create time of a database is reported back by + // the server. + assertThat(secondCreationTime).isAtLeast(initialDbCreateTime); + // Assert that the CreateDatabase RPC was called only once, and that the operation tracking + // was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + } + } finally { + DatabaseAdminClient client = testHelper.getClient().getDatabaseAdminClient(); + for (Database database : databases) { + client.dropDatabase( + database.getId().getInstanceId().getInstance(), database.getId().getDatabase()); + } + for (Backup backup : backups) { + client.deleteBackup(backup.getInstanceId().getInstance(), backup.getId().getBackup()); + } + } + } }