From 6a35e03593c948a96724d6751ef80a9834a03ec1 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Tue, 14 Jan 2020 10:53:13 +0100 Subject: [PATCH] fix: client should stop sending rpcs after database dropped DatabaseClients should not continue to try to send RPCs to a database that has been deleted. Instead, the session pool will keep track of whether a database not found error has been returned for a database, and if so, will invalidate itself. All subsequent calls for this database will return a DatabaseNotFoundException without calling a RPC. If a database is re-created, the user must create a new DatabaseClient with a new session pool in order to resume usage of the database. Fixes #16 --- .../spanner/DatabaseNotFoundException.java | 3 +- .../com/google/cloud/spanner/SessionPool.java | 55 ++++-- .../spanner/SpannerExceptionFactory.java | 8 +- .../com/google/cloud/spanner/SpannerImpl.java | 14 +- .../cloud/spanner/DatabaseClientImplTest.java | 156 +++++++++++------- .../cloud/spanner/MockSpannerServiceImpl.java | 53 ++++-- .../cloud/spanner/it/ITDatabaseTest.java | 64 +++++++ 7 files changed, 268 insertions(+), 85 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java index 50fd18a71b..f8423a6427 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java @@ -20,7 +20,8 @@ /** * Exception thrown by Cloud Spanner when an operation detects that the database that is being used - * no longer exists. This type of error has its own subclass as it is a condition that should cause the client library to stop trying to send RPCs to the backend until the user has taken action. + * no longer exists. This type of error has its own subclass as it is a condition that should cause + * the client library to stop trying to send RPCs to the backend until the user has taken action. */ public class DatabaseNotFoundException extends SpannerException { private static final long serialVersionUID = -6395746612598975751L; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 952da3d648..cdbf383d00 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -776,6 +776,15 @@ public void close() { if (lastException != null && isSessionNotFound(lastException)) { invalidateSession(this); } else { + if (lastException != null && isDatabaseNotFound(lastException)) { + // Mark this session pool as no longer valid and then release the session into the pool as + // there is nothing we can do with it anyways. + synchronized (lock) { + SessionPool.this.databaseNotFound = + MoreObjects.firstNonNull( + SessionPool.this.databaseNotFound, (DatabaseNotFoundException) lastException); + } + } lastException = null; if (state != SessionState.CLOSING) { state = SessionState.AVAILABLE; @@ -1057,8 +1066,8 @@ private static enum Position { @GuardedBy("lock") private SettableFuture closureFuture; -// @GuardedBy("lock") -// private DatabaseNotFoundException databaseNotFound; + @GuardedBy("lock") + private DatabaseNotFoundException databaseNotFound; @GuardedBy("lock") private final LinkedList readSessions = new LinkedList<>(); @@ -1229,6 +1238,13 @@ private PooledSession findSessionToKeepAlive( return null; } + /** @return true if this {@link SessionPool} is still valid. */ + boolean isValid() { + synchronized (lock) { + return closureFuture == null && databaseNotFound == null; + } + } + /** * Returns a session to be used for read requests to spanner. It will block if a session is not * currently available. In case the pool is exhausted and {@link @@ -1255,10 +1271,15 @@ PooledSession getReadSession() throws SpannerException { span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } -// if (databaseNotFound != null) { -// span.addAnnotation("Database has been deleted"); -// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound); -// } + if (databaseNotFound != null) { + span.addAnnotation("Database has been deleted"); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.NOT_FOUND, + String.format( + "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", + databaseNotFound.getMessage()), + databaseNotFound); + } sess = readSessions.poll(); if (sess == null) { sess = writePreparedSessions.poll(); @@ -1315,10 +1336,15 @@ PooledSession getReadWriteSession() { span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } -// if (databaseNotFound != null) { -// span.addAnnotation("Database has been deleted"); -// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound); -// } + if (databaseNotFound != null) { + span.addAnnotation("Database has been deleted"); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.NOT_FOUND, + String.format( + "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", + databaseNotFound.getMessage()), + databaseNotFound); + } sess = writePreparedSessions.poll(); if (sess == null) { if (numSessionsBeingPrepared <= readWriteWaiters.size()) { @@ -1461,7 +1487,9 @@ private void handleCreateSessionsFailure(SpannerException e, int count) { break; } } -// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); + this.databaseNotFound = + MoreObjects.firstNonNull( + this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); } } @@ -1484,7 +1512,10 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi if (isClosed()) { decrementPendingClosures(1); } -// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); + this.databaseNotFound = + MoreObjects.firstNonNull( + this.databaseNotFound, + isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); } else if (readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java index 701f5bb467..22b5f065e0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java @@ -26,6 +26,7 @@ import io.grpc.StatusRuntimeException; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.net.ssl.SSLHandshakeException; @@ -36,6 +37,11 @@ * ErrorCode#ABORTED} are always represented by {@link AbortedException}. */ public final class SpannerExceptionFactory { + static final String DATABASE_NOT_FOUND_MSG = + "Database not found: projects/.*/instances/.*/databases/.*\nresource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\"\nresource_name: \"projects/.*/instances/.*/databases/.*\"\ndescription: \"Database does not exist.\"\n"; + private static final Pattern DATABASE_NOT_FOUND_MSG_PATTERN = + Pattern.compile(".*" + DATABASE_NOT_FOUND_MSG + ".*"); + public static SpannerException newSpannerException(ErrorCode code, @Nullable String message) { return newSpannerException(code, message, null); } @@ -176,7 +182,7 @@ private static SpannerException newSpannerExceptionPreformatted( case NOT_FOUND: if (message != null && message.contains("Session not found")) { return new SessionNotFoundException(token, message, cause); - } else if (message != null && message.contains("Database not found")) { + } else if (message != null && DATABASE_NOT_FOUND_MSG_PATTERN.matcher(message).matches()) { return new DatabaseNotFoundException(token, message, cause); } // Fall through to the default. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 12cf89dd56..8f855f9bcc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -88,6 +88,9 @@ class SpannerImpl extends BaseService implements Spanner { @GuardedBy("this") private final Map dbClients = new HashMap<>(); + @GuardedBy("this") + private final List invalidatedDbClients = new ArrayList<>(); + @GuardedBy("this") private final Map sessionClients = new HashMap<>(); @@ -210,7 +213,13 @@ public InstanceAdminClient getInstanceAdminClient() { public DatabaseClient getDatabaseClient(DatabaseId db) { synchronized (this) { Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed"); - if (dbClients.containsKey(db)) { + if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) { + // Move the invalidated client to a separate list, so we can close it together with the + // other database clients when the Spanner instance is closed. + invalidatedDbClients.add(dbClients.get(db)); + dbClients.remove(db); + } + if (dbClients.containsKey(db) && dbClients.get(db).pool.isValid()) { return dbClients.get(db); } else { SessionPool pool = @@ -239,7 +248,8 @@ public void close() { Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed"); spannerIsClosed = true; closureFutures = new ArrayList<>(); - for (DatabaseClientImpl dbClient : dbClients.values()) { + invalidatedDbClients.addAll(dbClients.values()); + for (DatabaseClientImpl dbClient : invalidatedDbClients) { closureFutures.add(dbClient.closeAsync()); } dbClients.clear(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index cd8025a66a..92f0950962 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -16,7 +16,7 @@ package com.google.cloud.spanner; -import static org.hamcrest.CoreMatchers.containsString; +import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -51,8 +51,21 @@ @RunWith(JUnit4.class) public class DatabaseClientImplTest { - private static final String DATABASE_DOES_NOT_EXIST_MSG = - "Database not found: projects//instances//databases/ resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\" resource_name: \"projects//instances//databases/\" description: \"Database does not exist.\""; + private static final String DATABASE_NOT_FOUND_FORMAT = + SpannerExceptionFactory.DATABASE_NOT_FOUND_MSG.replaceAll("\\.\\*", "%s"); + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final String DATABASE_NOT_FOUND_MSG = + String.format( + "com.google.cloud.spanner.SpannerException: NOT_FOUND: io.grpc.StatusRuntimeException: NOT_FOUND: " + + DATABASE_NOT_FOUND_FORMAT, + TEST_PROJECT, + TEST_INSTANCE, + TEST_DATABASE, + TEST_PROJECT, + TEST_INSTANCE, + TEST_DATABASE); private static MockSpannerServiceImpl mockSpanner; private static Server server; private static LocalChannelProvider channelProvider; @@ -118,7 +131,7 @@ public static void stopServer() throws InterruptedException { public void setUp() throws IOException { spanner = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .build() @@ -139,7 +152,7 @@ public void tearDown() throws Exception { @Test public void testExecutePartitionedDml() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); } @@ -148,7 +161,7 @@ public void testExecutePartitionedDml() { @Test public void testExecutePartitionedDmlAborted() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); mockSpanner.abortNextTransaction(); long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); @@ -161,7 +174,7 @@ public void testExecutePartitionedDmlAborted() { @Test(expected = IllegalArgumentException.class) public void testExecutePartitionedDmlWithQuery() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); client.executePartitionedUpdate(SELECT1); } @@ -169,7 +182,7 @@ public void testExecutePartitionedDmlWithQuery() { @Test(expected = SpannerException.class) public void testExecutePartitionedDmlWithException() { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); client.executePartitionedUpdate(INVALID_UPDATE_STATEMENT); } @@ -185,14 +198,14 @@ public void testPartitionedDmlDoesNotTimeout() throws Exception { .build(); SpannerOptions.Builder builder = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()); // Set normal DML timeout value. builder.getSpannerStubSettingsBuilder().executeSqlSettings().setRetrySettings(retrySettings); try (Spanner spanner = builder.build().getService()) { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); assertThat( spanner.getOptions().getPartitionedDmlTimeout(), is(equalTo(Duration.ofHours(2L)))); @@ -227,14 +240,14 @@ public void testPartitionedDmlWithTimeout() throws Exception { mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0)); SpannerOptions.Builder builder = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()); // Set PDML timeout value. builder.setPartitionedDmlTimeout(Duration.ofMillis(100L)); try (Spanner spanner = builder.build().getService()) { DatabaseClient client = - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); assertThat( spanner.getOptions().getPartitionedDmlTimeout(), is(equalTo(Duration.ofMillis(100L)))); // PDML should timeout with these settings. @@ -267,10 +280,10 @@ public Long run(TransactionContext transaction) throws Exception { public void testDatabaseDoesNotExistOnPrepareSession() throws Exception { mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -286,6 +299,7 @@ public void testDatabaseDoesNotExistOnPrepareSession() throws Exception { } assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0))); + int currentNumRequest = mockSpanner.getRequests().size(); try { dbClient .readWriteTransaction() @@ -296,21 +310,20 @@ public Void run(TransactionContext transaction) throws Exception { return null; } }); - fail("missing expected NOT_FOUND exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); - assertThat(e.getMessage(), containsString("Database not found")); + fail("missing expected DatabaseNotFoundException"); + } catch (DatabaseNotFoundException e) { } + assertThat(mockSpanner.getRequests()).hasSize(currentNumRequest); } @Test public void testDatabaseDoesNotExistOnInitialization() throws Exception { mockSpanner.setBatchCreateSessionsExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until session creation has finished. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -326,11 +339,11 @@ public void testDatabaseDoesNotExistOnInitialization() throws Exception { public void testDatabaseDoesNotExistOnCreate() throws Exception { mockSpanner.setBatchCreateSessionsExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); // Ensure there are no sessions in the pool by default. try (Spanner spanner = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) @@ -338,16 +351,20 @@ public void testDatabaseDoesNotExistOnCreate() throws Exception { .getService()) { DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // The create session failure should propagate to the client and not retry. try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { fail("missing expected exception"); } catch (DatabaseNotFoundException e) { + // The server should only receive one BatchCreateSessions request. + assertThat(mockSpanner.getRequests()).hasSize(1); } try { dbClient.readWriteTransaction(); fail("missing expected exception"); } catch (DatabaseNotFoundException e) { + // No additional requests should have been sent by the client. + assertThat(mockSpanner.getRequests()).hasSize(1); } } } @@ -356,10 +373,10 @@ public void testDatabaseDoesNotExistOnCreate() throws Exception { public void testDatabaseDoesNotExistOnReplenish() throws Exception { mockSpanner.setBatchCreateSessionsExecutionTime( SimulatedExecutionTime.ofStickyException( - Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until session creation has finished. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -392,7 +409,7 @@ public void testPermissionDeniedOnPrepareSession() throws Exception { .asRuntimeException())); DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 @@ -424,66 +441,91 @@ public Void run(TransactionContext transaction) throws Exception { } } + /** + * Test showing that when a database is deleted while it is in use by a database client and then + * re-created with the same name, will continue to return {@link DatabaseNotFoundException}s until + * a new {@link DatabaseClient} is created. + */ @Test - public void testDatabaseDoesNotExistOnQueryAndIsThenRecreated() throws Exception { + public void testDatabaseIsDeletedAndThenRecreated() throws Exception { try (Spanner spanner = SpannerOptions.newBuilder() - .setProjectId("[PROJECT]") + .setProjectId(TEST_PROJECT) .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .build() .getService()) { DatabaseClientImpl dbClient = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created and prepared. Stopwatch watch = Stopwatch.createStarted(); while (watch.elapsed(TimeUnit.SECONDS) < 5 && (dbClient.pool.getNumberOfSessionsBeingCreated() > 0 - || dbClient.pool.getNumberOfSessionsBeingPrepared() > 0)) { + || dbClient.pool.getNumberOfSessionsBeingPrepared() > 0)) { Thread.sleep(1L); } // Simulate that the database has been deleted. mockSpanner.setStickyGlobalExceptions(true); - mockSpanner.addException(Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException()); + mockSpanner.addException( + Status.NOT_FOUND.withDescription(DATABASE_NOT_FOUND_MSG).asRuntimeException()); // All subsequent calls should fail with a DatabaseNotFoundException. try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { - while(rs.next()) { - } + while (rs.next()) {} fail("missing expected exception"); } catch (DatabaseNotFoundException e) { } try { - dbClient.readWriteTransaction().run(new TransactionCallable(){ - @Override - public Void run(TransactionContext transaction) throws Exception { - return null; - } - }); + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); fail("missing expected exception"); } catch (DatabaseNotFoundException e) { } - // Now simulate that the database has been re-created. The database client should still throw DatabaseNotFoundExceptions, as it is not the same database. + // Now simulate that the database has been re-created. The database client should still throw + // DatabaseNotFoundExceptions, as it is not the same database. The server should not receive + // any new requests. mockSpanner.reset(); // All subsequent calls should fail with a DatabaseNotFoundException. try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { - while(rs.next()) { - } -// fail("missing expected exception"); -// } catch (DatabaseNotFoundException e) { + while (rs.next()) {} + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + try { + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + assertThat(mockSpanner.getRequests()).isEmpty(); + // Now get a new database client. Normally multiple calls to Spanner#getDatabaseClient will + // return the same instance, but not when the instance has been invalidated by a + // DatabaseNotFoundException. + DatabaseClientImpl newClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + assertThat(newClient).isNotSameInstanceAs(dbClient); + // Executing a query should now work without problems. + try (ResultSet rs = newClient.singleUse().executeQuery(SELECT1)) { + while (rs.next()) {} } -// try { - dbClient.readWriteTransaction().run(new TransactionCallable(){ - @Override - public Void run(TransactionContext transaction) throws Exception { - return null; - } - }); -// fail("missing expected exception"); -// } catch (DatabaseNotFoundException e) { -// } + assertThat(mockSpanner.getRequests()).isNotEmpty(); } } @@ -491,7 +533,7 @@ public Void run(TransactionContext transaction) throws Exception { public void testAllowNestedTransactions() throws InterruptedException { final DatabaseClientImpl client = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); // Wait until all sessions have been created. final int minSessions = spanner.getOptions().getSessionPoolOptions().getMinSessions(); Stopwatch watch = Stopwatch.createStarted(); @@ -521,10 +563,10 @@ public Long run(TransactionContext transaction) throws Exception { public void testNestedTransactionsUsingTwoDatabases() throws InterruptedException { final DatabaseClientImpl client1 = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE1]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, "my-database-1")); final DatabaseClientImpl client2 = (DatabaseClientImpl) - spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE2]")); + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, "my-database-2")); // Wait until all sessions have been created so we can actually check the number of sessions // checked out of the pools. final int minSessions = spanner.getOptions().getSessionPoolOptions().getMinSessions(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 258bcbdcc2..41be7c0d44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -417,7 +417,9 @@ private SimulatedExecutionTime( } private void simulateExecutionTime( - Queue globalExceptions, boolean stickyGlobalExceptions, ReadWriteLock freezeLock) { + Queue globalExceptions, + boolean stickyGlobalExceptions, + ReadWriteLock freezeLock) { try { freezeLock.readLock().lock(); checkException(globalExceptions, stickyGlobalExceptions); @@ -447,6 +449,7 @@ private static void checkException(Queue exceptions, boolean keepExce private final Random random = new Random(); private double abortProbability = 0.0010D; + private final Queue requests = new ConcurrentLinkedQueue<>(); private final ReadWriteLock freezeLock = new ReentrantReadWriteLock(); private final Queue exceptions = new ConcurrentLinkedQueue<>(); private boolean stickyGlobalExceptions = false; @@ -606,6 +609,7 @@ public void setMaxTotalSessions(int max) { public void batchCreateSessions( BatchCreateSessionsRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getDatabase()); String name = null; try { @@ -614,7 +618,8 @@ public void batchCreateSessions( .withDescription("Session count must be >= 0") .asRuntimeException(); } - batchCreateSessionsExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + batchCreateSessionsExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); if (sessions.size() >= maxTotalSessions) { throw Status.RESOURCE_EXHAUSTED .withDescription("Maximum number of sessions reached") @@ -665,10 +670,12 @@ public void batchCreateSessions( @Override public void createSession( CreateSessionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getDatabase()); String name = generateSessionName(request.getDatabase()); try { - createSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + createSessionExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); Timestamp now = getCurrentGoogleTimestamp(); Session session = Session.newBuilder() @@ -699,6 +706,7 @@ public void createSession( @Override public void getSession(GetSessionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getName()); try { getSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); @@ -728,8 +736,10 @@ private void setSessionNotFound(String name, StreamObserver responseObser @Override public void listSessions( ListSessionsRequest request, StreamObserver responseObserver) { + requests.add(request); try { - listSessionsExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + listSessionsExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); List res = new ArrayList<>(); for (Session session : sessions.values()) { if (session.getName().startsWith(request.getDatabase())) { @@ -756,9 +766,11 @@ public int compare(Session o1, Session o2) { @Override public void deleteSession(DeleteSessionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getName()); try { - deleteSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + deleteSessionExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); Session session = sessions.get(request.getName()); if (session != null) { try { @@ -783,6 +795,7 @@ void doDeleteSession(Session session) { @Override public void executeSql(ExecuteSqlRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -870,6 +883,7 @@ private void returnResultSet( @Override public void executeBatchDml( ExecuteBatchDmlRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -878,7 +892,8 @@ public void executeBatchDml( } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeBatchDmlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + executeBatchDmlExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (isPartitionedDmlTransaction(transactionId)) { @@ -954,6 +969,7 @@ public void executeBatchDml( @Override public void executeStreamingSql( ExecuteSqlRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -962,7 +978,8 @@ public void executeStreamingSql( } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeStreamingSqlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + executeStreamingSqlExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (!request.getPartitionToken().isEmpty()) { @@ -1133,6 +1150,7 @@ private void throwTransactionAborted(ByteString transactionId) { @Override public void read(final ReadRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1171,6 +1189,7 @@ public Iterator iterator() { @Override public void streamingRead( final ReadRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1179,7 +1198,8 @@ public void streamingRead( } sessionLastUsed.put(session.getName(), Instant.now()); try { - streamingReadExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + streamingReadExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (!request.getPartitionToken().isEmpty()) { @@ -1338,6 +1358,7 @@ private Transaction getTemporaryTransactionOrNull(TransactionSelector tx) { @Override public void beginTransaction( BeginTransactionRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1346,7 +1367,8 @@ public void beginTransaction( } sessionLastUsed.put(session.getName(), Instant.now()); try { - beginTransactionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + beginTransactionExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = beginTransaction(session, request.getOptions()); responseObserver.onNext(transaction); responseObserver.onCompleted(); @@ -1442,6 +1464,7 @@ private void ensureMostRecentTransaction(Session session, ByteString transaction @Override public void commit(CommitRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getSession()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1495,6 +1518,7 @@ private void commitTransaction(ByteString transactionId) { @Override public void rollback(RollbackRequest request, StreamObserver responseObserver) { + requests.add(request); Preconditions.checkNotNull(request.getTransactionId()); Session session = sessions.get(request.getSession()); if (session == null) { @@ -1533,8 +1557,10 @@ void markAbortedTransaction(ByteString transactionId) { @Override public void partitionQuery( PartitionQueryRequest request, StreamObserver responseObserver) { + requests.add(request); try { - partitionQueryExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + partitionQueryExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); partition(request.getSession(), request.getTransaction(), responseObserver); } catch (StatusRuntimeException t) { responseObserver.onError(t); @@ -1546,8 +1572,10 @@ public void partitionQuery( @Override public void partitionRead( PartitionReadRequest request, StreamObserver responseObserver) { + requests.add(request); try { - partitionReadExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); + partitionReadExecutionTime.simulateExecutionTime( + exceptions, stickyGlobalExceptions, freezeLock); partition(request.getSession(), request.getTransaction(), responseObserver); } catch (StatusRuntimeException t) { responseObserver.onError(t); @@ -1585,7 +1613,7 @@ private void partition( @Override public List getRequests() { - return Collections.emptyList(); + return new ArrayList<>(this.requests); } @Override @@ -1610,6 +1638,7 @@ public ServerServiceDefinition getServiceDefinition() { /** Removes all sessions and transactions. Mocked results are not removed. */ @Override public void reset() { + requests.clear(); sessions.clear(); sessionLastUsed.clear(); transactions.clear(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java index 2fb5cda4f1..404f78971c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java @@ -17,10 +17,21 @@ package com.google.cloud.spanner.it; import static com.google.cloud.spanner.SpannerMatchers.isSpannerException; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseNotFoundException; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTest; import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.util.Collections; +import java.util.concurrent.ExecutionException; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -43,4 +54,57 @@ public void badDdl() { env.getTestHelper().createTestDatabase("CREATE TABLE T ( Illegal Way To Define A Table )"); } + + @Test + public void databaseDeletedTest() throws InterruptedException, ExecutionException { + // Create a test db, do a query, then delete it and verify that it returns + // DatabaseNotFoundExceptions. + Database db = env.getTestHelper().createTestDatabase(); + DatabaseClient client = env.getTestHelper().getClient().getDatabaseClient(db.getId()); + try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1"))) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(1L); + assertThat(rs.next()).isFalse(); + } + + // Delete the database. + db.drop(); + // We need to wait a little before Spanner actually starts sending DatabaseNotFound errors. + Thread.sleep(5000L); + // Queries to this database should now return DatabaseNotFoundExceptions. + try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1"))) { + rs.next(); + fail("Missing expected DatabaseNotFoundException"); + } catch (DatabaseNotFoundException e) { + // This is what we expect. + } + + // Now re-create a database with the same name. + OperationFuture op = + env.getTestHelper() + .getClient() + .getDatabaseAdminClient() + .createDatabase( + db.getId().getInstanceId().getInstance(), + db.getId().getDatabase(), + Collections.emptyList()); + Database newDb = op.get(); + + // Queries using the same DatabaseClient should still return DatabaseNotFoundExceptions. + try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1"))) { + rs.next(); + fail("Missing expected DatabaseNotFoundException"); + } catch (DatabaseNotFoundException e) { + // This is what we expect. + } + + // Now get a new DatabaseClient for the database. This should now result in a valid + // DatabaseClient. + DatabaseClient newClient = env.getTestHelper().getClient().getDatabaseClient(newDb.getId()); + try (ResultSet rs = newClient.singleUse().executeQuery(Statement.of("SELECT 1"))) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(1L); + assertThat(rs.next()).isFalse(); + } + } }