From 20965ad433d36a91d2f49ccd0e8f8b093ca0721b Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 10 Jan 2020 08:59:52 +0100 Subject: [PATCH] fix: stop sending rpcs on deleted db --- .../spanner/DatabaseNotFoundException.java | 33 +++++++++ .../com/google/cloud/spanner/SessionPool.java | 17 ++++- .../spanner/SpannerExceptionFactory.java | 2 + .../cloud/spanner/DatabaseClientImplTest.java | 71 +++++++++++++++++-- .../cloud/spanner/MockSpannerServiceImpl.java | 40 ++++++----- 5 files changed, 139 insertions(+), 24 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java new file mode 100644 index 00000000000..50fd18a71b5 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import javax.annotation.Nullable; + +/** + * Exception thrown by Cloud Spanner when an operation detects that the database that is being used + * no longer exists. This type of error has its own subclass as it is a condition that should cause the client library to stop trying to send RPCs to the backend until the user has taken action. + */ +public class DatabaseNotFoundException extends SpannerException { + private static final long serialVersionUID = -6395746612598975751L; + + /** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */ + DatabaseNotFoundException( + DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) { + super(token, ErrorCode.NOT_FOUND, false, message, cause); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cad6138c4e3..952da3d648c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -26,6 +26,7 @@ import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @@ -1056,6 +1057,9 @@ private static enum Position { @GuardedBy("lock") private SettableFuture closureFuture; +// @GuardedBy("lock") +// private DatabaseNotFoundException databaseNotFound; + @GuardedBy("lock") private final LinkedList readSessions = new LinkedList<>(); @@ -1193,7 +1197,7 @@ private boolean isSessionNotFound(SpannerException e) { } private boolean isDatabaseNotFound(SpannerException e) { - return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found"); + return e instanceof DatabaseNotFoundException; } private boolean isPermissionDenied(SpannerException e) { @@ -1251,6 +1255,10 @@ PooledSession getReadSession() throws SpannerException { span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } +// if (databaseNotFound != null) { +// span.addAnnotation("Database has been deleted"); +// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound); +// } sess = readSessions.poll(); if (sess == null) { sess = writePreparedSessions.poll(); @@ -1304,8 +1312,13 @@ PooledSession getReadWriteSession() { PooledSession sess = null; synchronized (lock) { if (closureFuture != null) { + span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } +// if (databaseNotFound != null) { +// span.addAnnotation("Database has been deleted"); +// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound); +// } sess = writePreparedSessions.poll(); if (sess == null) { if (numSessionsBeingPrepared <= readWriteWaiters.size()) { @@ -1448,6 +1461,7 @@ private void handleCreateSessionsFailure(SpannerException e, int count) { break; } } +// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); } } @@ -1470,6 +1484,7 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi if (isClosed()) { decrementPendingClosures(1); } +// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null); } else if (readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java index e7508d96e8e..701f5bb4672 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java @@ -176,6 +176,8 @@ private static SpannerException newSpannerExceptionPreformatted( case NOT_FOUND: if (message != null && message.contains("Session not found")) { return new SessionNotFoundException(token, message, cause); + } else if (message != null && message.contains("Database not found")) { + return new DatabaseNotFoundException(token, message, cause); } // Fall through to the default. default: diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index b147bd8d489..cd8025a66a3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -342,16 +342,12 @@ public void testDatabaseDoesNotExistOnCreate() throws Exception { // The create session failure should propagate to the client and not retry. try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); - assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG)); + } catch (DatabaseNotFoundException e) { } try { dbClient.readWriteTransaction(); fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); - assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG)); + } catch (DatabaseNotFoundException e) { } } } @@ -428,6 +424,69 @@ public Void run(TransactionContext transaction) throws Exception { } } + @Test + public void testDatabaseDoesNotExistOnQueryAndIsThenRecreated() throws Exception { + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService()) { + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // Wait until all sessions have been created and prepared. + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && (dbClient.pool.getNumberOfSessionsBeingCreated() > 0 + || dbClient.pool.getNumberOfSessionsBeingPrepared() > 0)) { + Thread.sleep(1L); + } + // Simulate that the database has been deleted. + mockSpanner.setStickyGlobalExceptions(true); + mockSpanner.addException(Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException()); + + // All subsequent calls should fail with a DatabaseNotFoundException. + try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { + while(rs.next()) { + } + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + try { + dbClient.readWriteTransaction().run(new TransactionCallable(){ + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + fail("missing expected exception"); + } catch (DatabaseNotFoundException e) { + } + + // Now simulate that the database has been re-created. The database client should still throw DatabaseNotFoundExceptions, as it is not the same database. + mockSpanner.reset(); + // All subsequent calls should fail with a DatabaseNotFoundException. + try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { + while(rs.next()) { + } +// fail("missing expected exception"); +// } catch (DatabaseNotFoundException e) { + } +// try { + dbClient.readWriteTransaction().run(new TransactionCallable(){ + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); +// fail("missing expected exception"); +// } catch (DatabaseNotFoundException e) { +// } + } + } + @Test public void testAllowNestedTransactions() throws InterruptedException { final DatabaseClientImpl client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 0a451e5baa8..258bcbdcc28 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -417,10 +417,10 @@ private SimulatedExecutionTime( } private void simulateExecutionTime( - Queue globalExceptions, ReadWriteLock freezeLock) { + Queue globalExceptions, boolean stickyGlobalExceptions, ReadWriteLock freezeLock) { try { freezeLock.readLock().lock(); - checkException(globalExceptions, false); + checkException(globalExceptions, stickyGlobalExceptions); checkException(this.exceptions, stickyException); if (minimumExecutionTime > 0 || randomExecutionTime > 0) { Uninterruptibles.sleepUninterruptibly( @@ -449,6 +449,7 @@ private static void checkException(Queue exceptions, boolean keepExce private final ReadWriteLock freezeLock = new ReentrantReadWriteLock(); private final Queue exceptions = new ConcurrentLinkedQueue<>(); + private boolean stickyGlobalExceptions = false; private final ConcurrentMap statementResults = new ConcurrentHashMap<>(); private final ConcurrentMap sessions = new ConcurrentHashMap<>(); @@ -613,7 +614,7 @@ public void batchCreateSessions( .withDescription("Session count must be >= 0") .asRuntimeException(); } - batchCreateSessionsExecutionTime.simulateExecutionTime(exceptions, freezeLock); + batchCreateSessionsExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); if (sessions.size() >= maxTotalSessions) { throw Status.RESOURCE_EXHAUSTED .withDescription("Maximum number of sessions reached") @@ -667,7 +668,7 @@ public void createSession( Preconditions.checkNotNull(request.getDatabase()); String name = generateSessionName(request.getDatabase()); try { - createSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + createSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Timestamp now = getCurrentGoogleTimestamp(); Session session = Session.newBuilder() @@ -700,7 +701,7 @@ public void createSession( public void getSession(GetSessionRequest request, StreamObserver responseObserver) { Preconditions.checkNotNull(request.getName()); try { - getSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + getSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Session session = sessions.get(request.getName()); if (session == null) { setSessionNotFound(request.getName(), responseObserver); @@ -728,7 +729,7 @@ private void setSessionNotFound(String name, StreamObserver responseObser public void listSessions( ListSessionsRequest request, StreamObserver responseObserver) { try { - listSessionsExecutionTime.simulateExecutionTime(exceptions, freezeLock); + listSessionsExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); List res = new ArrayList<>(); for (Session session : sessions.values()) { if (session.getName().startsWith(request.getDatabase())) { @@ -757,7 +758,7 @@ public int compare(Session o1, Session o2) { public void deleteSession(DeleteSessionRequest request, StreamObserver responseObserver) { Preconditions.checkNotNull(request.getName()); try { - deleteSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + deleteSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Session session = sessions.get(request.getName()); if (session != null) { try { @@ -790,7 +791,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeSqlExecutionTime.simulateExecutionTime(exceptions, freezeLock); + executeSqlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); ByteString transactionId = getTransactionId(session, request.getTransaction()); simulateAbort(session, transactionId); Statement statement = @@ -877,7 +878,7 @@ public void executeBatchDml( } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeBatchDmlExecutionTime.simulateExecutionTime(exceptions, freezeLock); + executeBatchDmlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (isPartitionedDmlTransaction(transactionId)) { @@ -961,7 +962,7 @@ public void executeStreamingSql( } sessionLastUsed.put(session.getName(), Instant.now()); try { - executeStreamingSqlExecutionTime.simulateExecutionTime(exceptions, freezeLock); + executeStreamingSqlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (!request.getPartitionToken().isEmpty()) { @@ -1140,7 +1141,7 @@ public void read(final ReadRequest request, StreamObserver responseOb } sessionLastUsed.put(session.getName(), Instant.now()); try { - readExecutionTime.simulateExecutionTime(exceptions, freezeLock); + readExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); simulateAbort(session, transactionId); @@ -1178,7 +1179,7 @@ public void streamingRead( } sessionLastUsed.put(session.getName(), Instant.now()); try { - streamingReadExecutionTime.simulateExecutionTime(exceptions, freezeLock); + streamingReadExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Get or start transaction ByteString transactionId = getTransactionId(session, request.getTransaction()); if (!request.getPartitionToken().isEmpty()) { @@ -1345,7 +1346,7 @@ public void beginTransaction( } sessionLastUsed.put(session.getName(), Instant.now()); try { - beginTransactionExecutionTime.simulateExecutionTime(exceptions, freezeLock); + beginTransactionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = beginTransaction(session, request.getOptions()); responseObserver.onNext(transaction); responseObserver.onCompleted(); @@ -1449,7 +1450,7 @@ public void commit(CommitRequest request, StreamObserver respons } sessionLastUsed.put(session.getName(), Instant.now()); try { - commitExecutionTime.simulateExecutionTime(exceptions, freezeLock); + commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Find or start a transaction Transaction transaction; if (request.hasSingleUseTransaction()) { @@ -1502,7 +1503,7 @@ public void rollback(RollbackRequest request, StreamObserver responseObse } sessionLastUsed.put(session.getName(), Instant.now()); try { - rollbackExecutionTime.simulateExecutionTime(exceptions, freezeLock); + rollbackExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); Transaction transaction = transactions.get(request.getTransactionId()); if (transaction != null) { rollbackTransaction(transaction.getId()); @@ -1533,7 +1534,7 @@ void markAbortedTransaction(ByteString transactionId) { public void partitionQuery( PartitionQueryRequest request, StreamObserver responseObserver) { try { - partitionQueryExecutionTime.simulateExecutionTime(exceptions, freezeLock); + partitionQueryExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); partition(request.getSession(), request.getTransaction(), responseObserver); } catch (StatusRuntimeException t) { responseObserver.onError(t); @@ -1546,7 +1547,7 @@ public void partitionQuery( public void partitionRead( PartitionReadRequest request, StreamObserver responseObserver) { try { - partitionReadExecutionTime.simulateExecutionTime(exceptions, freezeLock); + partitionReadExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); partition(request.getSession(), request.getTransaction(), responseObserver); } catch (StatusRuntimeException t) { responseObserver.onError(t); @@ -1597,6 +1598,10 @@ public void addException(Exception exception) { exceptions.add(exception); } + public void setStickyGlobalExceptions(boolean sticky) { + this.stickyGlobalExceptions = sticky; + } + @Override public ServerServiceDefinition getServiceDefinition() { return bindService(); @@ -1614,6 +1619,7 @@ public void reset() { partitionTokens.clear(); transactionLastUsed.clear(); exceptions.clear(); + stickyGlobalExceptions = false; } public void removeAllExecutionTimes() {