From 11e4a90e73af8a5baf9aa593daa6192520363398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 22 Jan 2020 07:40:07 +0100 Subject: [PATCH] fix: stop sending RPCs to deleted database (#34) * fix: stop sending rpcs on deleted db * fix: client should stop sending rpcs after database dropped DatabaseClients should not continue to try to send RPCs to a database that has been deleted. Instead, the session pool will keep track of whether a database not found error has been returned for a database, and if so, will invalidate itself. All subsequent calls for this database will return a DatabaseNotFoundException without calling a RPC. If a database is re-created, the user must create a new DatabaseClient with a new session pool in order to resume usage of the database. Fixes #16 * fix: remove double check on isValid * fix: add wait to deleted db integration test * fix: process review comments * fix: update copyright year --- .../spanner/DatabaseNotFoundException.java | 34 ++++ .../com/google/cloud/spanner/SessionPool.java | 48 ++++- .../spanner/SpannerExceptionFactory.java | 11 ++ .../com/google/cloud/spanner/SpannerImpl.java | 12 +- .../cloud/spanner/DatabaseClientImplTest.java | 174 ++++++++++++++---- .../cloud/spanner/MockSpannerServiceImpl.java | 71 +++++-- .../cloud/spanner/it/ITDatabaseTest.java | 76 ++++++++ 7 files changed, 371 insertions(+), 55 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java new file mode 100644 index 0000000000..baad18b31b --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import javax.annotation.Nullable; + +/** + * Exception thrown by Cloud Spanner when an operation detects that the database that is being used + * no longer exists. This type of error has its own subclass as it is a condition that should cause + * the client library to stop trying to send RPCs to the backend until the user has taken action. + */ +public class DatabaseNotFoundException extends SpannerException { + private static final long serialVersionUID = -6395746612598975751L; + + /** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */ + DatabaseNotFoundException( + DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) { + super(token, ErrorCode.NOT_FOUND, false, message, cause); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cad6138c4e..cdbf383d00 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -26,6 +26,7 @@ import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @@ -775,6 +776,15 @@ public void close() { if (lastException != null && isSessionNotFound(lastException)) { invalidateSession(this); } else { + if (lastException != null && isDatabaseNotFound(lastException)) { + // Mark this session pool as no longer valid and then release the session into the pool as + // there is nothing we can do with it anyways. + synchronized (lock) { + SessionPool.this.databaseNotFound = + MoreObjects.firstNonNull( + SessionPool.this.databaseNotFound, (DatabaseNotFoundException) lastException); + } + } lastException = null; if (state != SessionState.CLOSING) { state = SessionState.AVAILABLE; @@ -1056,6 +1066,9 @@ private static enum Position { @GuardedBy("lock") private SettableFuture closureFuture; + @GuardedBy("lock") + private DatabaseNotFoundException databaseNotFound; + @GuardedBy("lock") private final LinkedList readSessions = new LinkedList<>(); @@ -1193,7 +1206,7 @@ private boolean isSessionNotFound(SpannerException e) { } private boolean isDatabaseNotFound(SpannerException e) { - return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found"); + return e instanceof DatabaseNotFoundException; } private boolean isPermissionDenied(SpannerException e) { @@ -1225,6 +1238,13 @@ private PooledSession findSessionToKeepAlive( return null; } + /** @return true if this {@link SessionPool} is still valid. */ + boolean isValid() { + synchronized (lock) { + return closureFuture == null && databaseNotFound == null; + } + } + /** * Returns a session to be used for read requests to spanner. It will block if a session is not * currently available. In case the pool is exhausted and {@link @@ -1251,6 +1271,15 @@ PooledSession getReadSession() throws SpannerException { span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } + if (databaseNotFound != null) { + span.addAnnotation("Database has been deleted"); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.NOT_FOUND, + String.format( + "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", + databaseNotFound.getMessage()), + databaseNotFound); + } sess = readSessions.poll(); if (sess == null) { sess = writePreparedSessions.poll(); @@ -1304,8 +1333,18 @@ PooledSession getReadWriteSession() { PooledSession sess = null; synchronized (lock) { if (closureFuture != null) { + span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } + if (databaseNotFound != null) { + span.addAnnotation("Database has been deleted"); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.NOT_FOUND, + String.format( + "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", + databaseNotFound.getMessage()), + databaseNotFound); + } sess = writePreparedSessions.poll(); if (sess == null) { if (numSessionsBeingPrepared <= readWriteWaiters.size()) { @@ -1448,6 +1487,9 @@ private void handleCreateSessionsFailure(SpannerException e, int count) { break; } } + this.databaseNotFound = + MoreObjects.firstNonNull( + this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); } } @@ -1470,6 +1512,10 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi if (isClosed()) { decrementPendingClosures(1); } + this.databaseNotFound = + MoreObjects.firstNonNull( + this.databaseNotFound, + isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); } else if (readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java index e7508d96e8..2270338150 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java @@ -26,6 +26,7 @@ import io.grpc.StatusRuntimeException; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.net.ssl.SSLHandshakeException; @@ -36,6 +37,14 @@ * ErrorCode#ABORTED} are always represented by {@link AbortedException}. */ public final class SpannerExceptionFactory { + static final String DATABASE_NOT_FOUND_MSG = + "Database not found: projects/.*/instances/.*/databases/.*\n" + + "resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\"\n" + + "resource_name: \"projects/.*/instances/.*/databases/.*\"\n" + + "description: \"Database does not exist.\"\n"; + private static final Pattern DATABASE_NOT_FOUND_MSG_PATTERN = + Pattern.compile(".*" + DATABASE_NOT_FOUND_MSG + ".*"); + public static SpannerException newSpannerException(ErrorCode code, @Nullable String message) { return newSpannerException(code, message, null); } @@ -176,6 +185,8 @@ private static SpannerException newSpannerExceptionPreformatted( case NOT_FOUND: if (message != null && message.contains("Session not found")) { return new SessionNotFoundException(token, message, cause); + } else if (message != null && DATABASE_NOT_FOUND_MSG_PATTERN.matcher(message).matches()) { + return new DatabaseNotFoundException(token, message, cause); } // Fall through to the default. default: diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 12cf89dd56..879cd39e65 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -88,6 +88,9 @@ class SpannerImpl extends BaseService implements Spanner { @GuardedBy("this") private final Map dbClients = new HashMap<>(); + @GuardedBy("this") + private final List invalidatedDbClients = new ArrayList<>(); + @GuardedBy("this") private final Map sessionClients = new HashMap<>(); @@ -210,6 +213,12 @@ public InstanceAdminClient getInstanceAdminClient() { public DatabaseClient getDatabaseClient(DatabaseId db) { synchronized (this) { Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed"); + if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) { + // Move the invalidated client to a separate list, so we can close it together with the + // other database clients when the Spanner instance is closed. + invalidatedDbClients.add(dbClients.get(db)); + dbClients.remove(db); + } if (dbClients.containsKey(db)) { return dbClients.get(db); } else { @@ -239,7 +248,8 @@ public void close() { Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed"); spannerIsClosed = true; closureFutures = new ArrayList<>(); - for (DatabaseClientImpl dbClient : dbClients.values()) { + invalidatedDbClients.addAll(dbClients.values()); + for (DatabaseClientImpl dbClient : invalidatedDbClients) { closureFutures.add(dbClient.closeAsync()); } dbClients.clear(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index b147bd8d48..c80f458540 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -16,7 +16,7 @@ package com.google.cloud.spanner; -import static org.hamcrest.CoreMatchers.containsString; +import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -51,8 +51,24 @@ @RunWith(JUnit4.class) public class DatabaseClientImplTest { - private static final String DATABASE_DOES_NOT_EXIST_MSG = - "Database not found: projects//instances//databases/ resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\" resource_name: \"projects//instances//databases/\" description: \"Database does not exist.\""; + private static final String DATABASE_NOT_FOUND_FORMAT = + "Database not found: projects/%s/instances/%s/databases/%s\n" + + "resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\"\n" + + "resource_name: \"projects/%s/instances/%s/databases/%s\"\n" + + "description: \"Database does not exist.\"\n"; + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final String DATABASE_NOT_FOUND_MSG = + String.format( + "com.google.cloud.spanner.SpannerException: NOT_FOUND: io.grpc.StatusRuntimeException: NOT_FOUND: " + + DATABASE_NOT_FOUND_FORMAT, + TEST_PROJECT, + TEST_INSTANCE, + TEST_DATABASE, + TEST_PROJECT, + TEST_INSTANCE, + TEST_DATABASE); private static MockSpannerServiceImpl mockSpanner; private static Server server; private static LocalChannelProvider channelProvider; @@ -118,7 +134,7 @@ public static void stopServer() throws InterruptedException { public void setUp() throws IOException { spanner = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .build() @@ -139,7 +155,7 @@ public void tearDown() throws Exception { @Test public void testExecutePartitionedDml() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); } @@ -148,7 +164,7 @@ public void testExecutePartitionedDml() { @Test public void testExecutePartitionedDmlAborted() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); mockSpanner.abortNextTransaction(); long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); @@ -161,7 +177,7 @@ public void testExecutePartitionedDmlAborted() { @Test(expected = IllegalArgumentException.class) public void testExecutePartitionedDmlWithQuery() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); client.executePartitionedUpdate(SELECT1); } @@ -169,7 +185,7 @@ public void testExecutePartitionedDmlWithQuery() { @Test(expected = SpannerException.class) public void testExecutePartitionedDmlWithException() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); client.executePartitionedUpdate(INVALID_UPDATE_STATEMENT); } @@ -185,14 +201,14 @@ public void testPartitionedDmlDoesNotTimeout() throws Exception { .build(); SpannerOptions.Builder builder = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()); // Set normal DML timeout value. builder.getSpannerStubSettingsBuilder().executeSqlSettings().setRetrySettings(retrySettings); try (Spanner spanner = builder.build().getService()) { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); assertThat( spanner.getOptions().getPartitionedDmlTimeout(), is(equalTo(Duration.ofHours(2L)))); @@ -227,14 +243,14 @@ public void testPartitionedDmlWithTimeout() throws Exception { mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0)); SpannerOptions.Builder builder = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()); // Set PDML timeout value. builder.setPartitionedDmlTimeout(Duration.ofMillis(100L)); try (Spanner spanner = builder.build().getService()) { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); assertThat( spanner.getOptions().getPartitionedDmlTimeout(), is(equalTo(Duration.ofMillis(100L)))); // PDML should timeout with these settings. @@ -267,10 +283,10 @@ public Long run(TransactionContext transaction) throws Exception { public void testDatabaseDoesNotExistOnPrepareSession() throws Exception { mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -286,6 +302,7 @@ public void testDatabaseDoesNotExistOnPrepareSession() throws Exception { } assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0))); + int currentNumRequest = mockSpanner.getRequests().size(); try { dbClient .readWriteTransaction() @@ -296,21 +313,20 @@ public Void run(TransactionContext transaction) throws Exception { return null; } }); - fail("missing expected NOT_FOUND exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); - assertThat(e.getMessage(), containsString("Database not found")); + fail("missing expected DatabaseNotFoundException"); + } catch (DatabaseNotFoundException e) { } + assertThat(mockSpanner.getRequests()).hasSize(currentNumRequest); } @Test public void testDatabaseDoesNotExistOnInitialization() throws Exception { mockSpanner.setBatchCreateSessionsExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until session creation has finished. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -326,11 +342,11 @@ public void testDatabaseDoesNotExistOnInitialization() throws Exception { public void testDatabaseDoesNotExistOnCreate() throws Exception { mockSpanner.setBatchCreateSessionsExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); // Ensure there are no sessions in the pool by default. try (Spanner spanner = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) @@ -338,20 +354,20 @@ public void testDatabaseDoesNotExistOnCreate() throws Exception { .getService()) { DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // The create session failure should propagate to the client and not retry. try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); - assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG)); + } catch (DatabaseNotFoundException e) { + // The server should only receive one BatchCreateSessions request. + assertThat(mockSpanner.getRequests()).hasSize(1); } try { dbClient.readWriteTransaction(); fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); - assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG)); + } catch (DatabaseNotFoundException e) { + // No additional requests should have been sent by the client. + assertThat(mockSpanner.getRequests()).hasSize(1); } } } @@ -360,10 +376,10 @@ public void testDatabaseDoesNotExistOnCreate() throws Exception { public void testDatabaseDoesNotExistOnReplenish() throws Exception { mockSpanner.setBatchCreateSessionsExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until session creation has finished. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -396,7 +412,7 @@ public void testPermissionDeniedOnPrepareSession() throws Exception { .asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -428,11 +444,99 @@ public Void run(TransactionContext transaction) throws Exception { } } + /** + * Test showing that when a database is deleted while it is in use by a database client and then + * re-created with the same name, will continue to return {@link DatabaseNotFoundException}s until + * a new {@link DatabaseClient} is created. + */ + @Test + public void testDatabaseIsDeletedAndThenRecreated() throws Exception { + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService()) { + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Wait until all sessions have been created and prepared. + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && (dbClient.pool.getNumberOfSessionsBeingCreated() > 0 + || dbClient.pool.getNumberOfSessionsBeingPrepared() > 0)) { + Thread.sleep(1L); + } + // Simulate that the database has been deleted. + mockSpanner.setStickyGlobalExceptions(true); + mockSpanner.addException( + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException()); + + // All subsequent calls should fail with a DatabaseNotFoundException. + try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { + while (rs.next()) {} + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + try { + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + + // Now simulate that the database has been re-created. The database client should still throw + // DatabaseNotFoundExceptions, as it is not the same database. The server should not receive + // any new requests. + mockSpanner.reset(); + // All subsequent calls should fail with a DatabaseNotFoundException. + try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { + while (rs.next()) {} + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + try { + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + assertThat(mockSpanner.getRequests()).isEmpty(); + // Now get a new database client. Normally multiple calls to Spanner#getDatabaseClient will + // return the same instance, but not when the instance has been invalidated by a + // DatabaseNotFoundException. + DatabaseClientImpl newClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + assertThat(newClient).isNotSameInstanceAs(dbClient); + // Executing a query should now work without problems. + try (ResultSet rs = newClient.singleUse().executeQuery(SELECT1)) { + while (rs.next()) {} + } + assertThat(mockSpanner.getRequests()).isNotEmpty(); + } + } + @Test public void testAllowNestedTransactions() throws InterruptedException { final DatabaseClientImpl client = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created. final int minSessions = spanner.getOptions().getSessionPoolOptions().getMinSessions(); Stopwatch watch = Stopwatch.createStarted(); @@ -462,10 +566,10 @@ public Long run(TransactionContext transaction) throws Exception { public void testNestedTransactionsUsingTwoDatabases() throws InterruptedException { final DatabaseClientImpl client1 = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE1]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, "my-database-1")); final DatabaseClientImpl client2 = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE2]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, "my-database-2")); // Wait until all sessions have been created so we can actually check the number of sessions // checked out of the pools. final int minSessions = spanner.getOptions().getSessionPoolOptions().getMinSessions(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 0a451e5baa..41be7c0d44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -417,10 +417,12 @@ private SimulatedExecutionTime( } private void simulateExecutionTime( - Queue globalExceptions, ReadWriteLock freezeLock) { + Queue globalExceptions, + boolean stickyGlobalExceptions, + ReadWriteLock freezeLock) { try { freezeLock.readLock().lock(); - checkException(globalExceptions, false); + checkException(globalExceptions, stickyGlobalExceptions); checkException(this.exceptions, stickyException); if (minimumExecutionTime > 0 || randomExecutionTime > 0) { Uninterruptibles.sleepUninterruptibly( @@ -447,8 +449,10 @@ private static void checkException(Queue exceptions, boolean keepExce private final Random random = new Random(); private double abortProbability = 0.0010D; + private final Queue requests = new ConcurrentLinkedQueue<>(); private final ReadWriteLock freezeLock = new ReentrantReadWriteLock(); private final Queue exceptions = new ConcurrentLinkedQueue<>(); + private boolean stickyGlobalExceptions = false; private final ConcurrentMap statementResults = new ConcurrentHashMap<>(); private final ConcurrentMap sessions = new ConcurrentHashMap<>(); @@ -605,6 +609,7 @@ public void setMaxTotalSessions(int max) { public void batchCreateSessions( BatchCreateSessionsRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getDatabase()); String name = null; try { @@ -613,7 +618,8 @@ public void batchCreateSessions( .withDescription("Session count must be >= 0") .asRuntimeException(); } - batchCreateSessionsExecutionTime.simulateExecutionTime(exceptions, freezeLock); + batchCreateSessionsExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); if (sessions.size() >= maxTotalSessions) { throw Status.RESOURCE_EXHAUSTED .withDescription("Maximum number of sessions reached") @@ -664,10 +670,12 @@ public void batchCreateSessions( @Override public void createSession( CreateSessionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getDatabase()); String name = generateSessionName(request.getDatabase()); try { - createSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + createSessionExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); Timestamp now = getCurrentGoogleTimestamp(); Session session = Session.newBuilder() @@ -698,9 +706,10 @@ public void createSession( @Override public void getSession(GetSessionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getName()); try { - getSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + getSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Session session = sessions.get(request.getName()); if (session == null) { setSessionNotFound(request.getName(), responseObserver); @@ -727,8 +736,10 @@ private void setSessionNotFound(String name, StreamObserver responseObser @Override public void listSessions( ListSessionsRequest request, StreamObserver responseObserver) { + requests.add(request); try { - listSessionsExecutionTime.simulateExecutionTime(exceptions, freezeLock); + listSessionsExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); List res = new ArrayList<>(); for (Session session : sessions.values()) { if (session.getName().startsWith(request.getDatabase())) { @@ -755,9 +766,11 @@ public int compare(Session o1, Session o2) { @Override public void deleteSession(DeleteSessionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getName()); try { - deleteSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + deleteSessionExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); Session session = sessions.get(request.getName()); if (session != null) { try { @@ -782,6 +795,7 @@ void doDeleteSession(Session session) { @Override public void executeSql(ExecuteSqlRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -790,7 +804,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeSqlExecutionTime.simulateExecutionTime(exceptions, freezeLock); + executeSqlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); ByteString transactionId = getTransactionId(session, request.getTransaction()); simulateAbort(session, transactionId); Statement statement = @@ -869,6 +883,7 @@ private void returnResultSet( @Override public void executeBatchDml( ExecuteBatchDmlRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -877,7 +892,8 @@ public void executeBatchDml( } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeBatchDmlExecutionTime.simulateExecutionTime(exceptions, freezeLock); + executeBatchDmlExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (isPartitionedDmlTransaction(transactionId)) { @@ -953,6 +969,7 @@ public void executeBatchDml( @Override public void executeStreamingSql( ExecuteSqlRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -961,7 +978,8 @@ public void executeStreamingSql( } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeStreamingSqlExecutionTime.simulateExecutionTime(exceptions, freezeLock); + executeStreamingSqlExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (!request.getPartitionToken().isEmpty()) { @@ -1132,6 +1150,7 @@ private void throwTransactionAborted(ByteString transactionId) { @Override public void read(final ReadRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1140,7 +1159,7 @@ public void read(final ReadRequest request, StreamObserver responseOb } sessionLastUsed.put(session.getName(), Instant.now()); try { - readExecutionTime.simulateExecutionTime(exceptions, freezeLock); + readExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); simulateAbort(session, transactionId); @@ -1170,6 +1189,7 @@ public Iterator iterator() { @Override public void streamingRead( final ReadRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1178,7 +1198,8 @@ public void streamingRead( } sessionLastUsed.put(session.getName(), Instant.now()); try { - streamingReadExecutionTime.simulateExecutionTime(exceptions, freezeLock); + streamingReadExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (!request.getPartitionToken().isEmpty()) { @@ -1337,6 +1358,7 @@ private Transaction getTemporaryTransactionOrNull(TransactionSelector tx) { @Override public void beginTransaction( BeginTransactionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1345,7 +1367,8 @@ public void beginTransaction( } sessionLastUsed.put(session.getName(), Instant.now()); try { - beginTransactionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + beginTransactionExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = beginTransaction(session, request.getOptions()); responseObserver.onNext(transaction); responseObserver.onCompleted(); @@ -1441,6 +1464,7 @@ private void ensureMostRecentTransaction(Session session, ByteString transaction @Override public void commit(CommitRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1449,7 +1473,7 @@ public void commit(CommitRequest request, StreamObserver respons } sessionLastUsed.put(session.getName(), Instant.now()); try { - commitExecutionTime.simulateExecutionTime(exceptions, freezeLock); + commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Find or start a transaction Transaction transaction; if (request.hasSingleUseTransaction()) { @@ -1494,6 +1518,7 @@ private void commitTransaction(ByteString transactionId) { @Override public void rollback(RollbackRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getTransactionId()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1502,7 +1527,7 @@ public void rollback(RollbackRequest request, StreamObserver responseObse } sessionLastUsed.put(session.getName(), Instant.now()); try { - rollbackExecutionTime.simulateExecutionTime(exceptions, freezeLock); + rollbackExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = transactions.get(request.getTransactionId()); if (transaction != null) { rollbackTransaction(transaction.getId()); @@ -1532,8 +1557,10 @@ void markAbortedTransaction(ByteString transactionId) { @Override public void partitionQuery( PartitionQueryRequest request, StreamObserver responseObserver) { + requests.add(request); try { - partitionQueryExecutionTime.simulateExecutionTime(exceptions, freezeLock); + partitionQueryExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); partition(request.getSession(), request.getTransaction(), responseObserver); } catch (StatusRuntimeException t) { responseObserver.onError(t); @@ -1545,8 +1572,10 @@ public void partitionQuery( @Override public void partitionRead( PartitionReadRequest request, StreamObserver responseObserver) { + requests.add(request); try { - partitionReadExecutionTime.simulateExecutionTime(exceptions, freezeLock); + partitionReadExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); partition(request.getSession(), request.getTransaction(), responseObserver); } catch (StatusRuntimeException t) { responseObserver.onError(t); @@ -1584,7 +1613,7 @@ private void partition( @Override public List getRequests() { - return Collections.emptyList(); + return new ArrayList<>(this.requests); } @Override @@ -1597,6 +1626,10 @@ public void addException(Exception exception) { exceptions.add(exception); } + public void setStickyGlobalExceptions(boolean sticky) { + this.stickyGlobalExceptions = sticky; + } + @Override public ServerServiceDefinition getServiceDefinition() { return bindService(); @@ -1605,6 +1638,7 @@ public ServerServiceDefinition getServiceDefinition() { /** Removes all sessions and transactions. Mocked results are not removed. */ @Override public void reset() { + requests.clear(); sessions.clear(); sessionLastUsed.clear(); transactions.clear(); @@ -1614,6 +1648,7 @@ public void reset() { partitionTokens.clear(); transactionLastUsed.clear(); exceptions.clear(); + stickyGlobalExceptions = false; } public void removeAllExecutionTimes() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java index 2fb5cda4f1..ef50365edc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java @@ -17,10 +17,21 @@ package com.google.cloud.spanner.it; import static com.google.cloud.spanner.SpannerMatchers.isSpannerException; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import com.google.api.client.util.ExponentialBackOff; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseNotFoundException; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTest; import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.util.Collections; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -43,4 +54,69 @@ public void badDdl() { env.getTestHelper().createTestDatabase("CREATE TABLE T ( Illegal Way To Define A Table )"); } + + @Test + public void databaseDeletedTest() throws Exception { + // Create a test db, do a query, then delete it and verify that it returns + // DatabaseNotFoundExceptions. + Database db = env.getTestHelper().createTestDatabase(); + DatabaseClient client = env.getTestHelper().getClient().getDatabaseClient(db.getId()); + try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1"))) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(1L); + assertThat(rs.next()).isFalse(); + } + + // Delete the database. + db.drop(); + // We need to wait a little before Spanner actually starts sending DatabaseNotFound errors. + ExponentialBackOff backoff = + new ExponentialBackOff.Builder() + .setInitialIntervalMillis(1000) + .setMaxElapsedTimeMillis(35000) + .setMaxIntervalMillis(5000) + .build(); + DatabaseNotFoundException notFoundException = null; + long millis = 0L; + while ((millis = backoff.nextBackOffMillis()) != ExponentialBackOff.STOP) { + Thread.sleep(millis); + // Queries to this database should eventually return DatabaseNotFoundExceptions. + try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1"))) { + rs.next(); + } catch (DatabaseNotFoundException e) { + // This is what we expect. + notFoundException = e; + break; + } + } + assertThat(notFoundException).isNotNull(); + + // Now re-create a database with the same name. + OperationFuture op = + env.getTestHelper() + .getClient() + .getDatabaseAdminClient() + .createDatabase( + db.getId().getInstanceId().getInstance(), + db.getId().getDatabase(), + Collections.emptyList()); + Database newDb = op.get(); + + // Queries using the same DatabaseClient should still return DatabaseNotFoundExceptions. + try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1"))) { + rs.next(); + fail("Missing expected DatabaseNotFoundException"); + } catch (DatabaseNotFoundException e) { + // This is what we expect. + } + + // Now get a new DatabaseClient for the database. This should now result in a valid + // DatabaseClient. + DatabaseClient newClient = env.getTestHelper().getClient().getDatabaseClient(newDb.getId()); + try (ResultSet rs = newClient.singleUse().executeQuery(Statement.of("SELECT 1"))) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(1L); + assertThat(rs.next()).isFalse(); + } + } }