Skip to content

Commit

Permalink
fix: stop sending rpcs on deleted db
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Jan 10, 2020
1 parent 93c0aed commit 20965ad
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 24 deletions.
@@ -0,0 +1,33 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import javax.annotation.Nullable;

/**
* Exception thrown by Cloud Spanner when an operation detects that the database that is being used
* no longer exists. This type of error has its own subclass as it is a condition that should cause the client library to stop trying to send RPCs to the backend until the user has taken action.
*/
public class DatabaseNotFoundException extends SpannerException {
private static final long serialVersionUID = -6395746612598975751L;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
DatabaseNotFoundException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
super(token, ErrorCode.NOT_FOUND, false, message, cause);
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1056,6 +1057,9 @@ private static enum Position {
@GuardedBy("lock")
private SettableFuture<Void> closureFuture;

// @GuardedBy("lock")
// private DatabaseNotFoundException databaseNotFound;

@GuardedBy("lock")
private final LinkedList<PooledSession> readSessions = new LinkedList<>();

Expand Down Expand Up @@ -1193,7 +1197,7 @@ private boolean isSessionNotFound(SpannerException e) {
}

private boolean isDatabaseNotFound(SpannerException e) {
return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found");
return e instanceof DatabaseNotFoundException;
}

private boolean isPermissionDenied(SpannerException e) {
Expand Down Expand Up @@ -1251,6 +1255,10 @@ PooledSession getReadSession() throws SpannerException {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
// if (databaseNotFound != null) {
// span.addAnnotation("Database has been deleted");
// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound);
// }
sess = readSessions.poll();
if (sess == null) {
sess = writePreparedSessions.poll();
Expand Down Expand Up @@ -1304,8 +1312,13 @@ PooledSession getReadWriteSession() {
PooledSession sess = null;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
// if (databaseNotFound != null) {
// span.addAnnotation("Database has been deleted");
// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound);
// }
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
Expand Down Expand Up @@ -1448,6 +1461,7 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
break;
}
}
// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
}
}

Expand All @@ -1470,6 +1484,7 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
if (isClosed()) {
decrementPendingClosures(1);
}
// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
} else if (readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down
Expand Up @@ -176,6 +176,8 @@ private static SpannerException newSpannerExceptionPreformatted(
case NOT_FOUND:
if (message != null && message.contains("Session not found")) {
return new SessionNotFoundException(token, message, cause);
} else if (message != null && message.contains("Database not found")) {
return new DatabaseNotFoundException(token, message, cause);
}
// Fall through to the default.
default:
Expand Down
Expand Up @@ -342,16 +342,12 @@ public void testDatabaseDoesNotExistOnCreate() throws Exception {
// The create session failure should propagate to the client and not retry.
try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG));
} catch (DatabaseNotFoundException e) {
}
try {
dbClient.readWriteTransaction();
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG));
} catch (DatabaseNotFoundException e) {
}
}
}
Expand Down Expand Up @@ -428,6 +424,69 @@ public Void run(TransactionContext transaction) throws Exception {
}
}

@Test
public void testDatabaseDoesNotExistOnQueryAndIsThenRecreated() throws Exception {
try (Spanner spanner =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.build()
.getService()) {
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until all sessions have been created and prepared.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& (dbClient.pool.getNumberOfSessionsBeingCreated() > 0
|| dbClient.pool.getNumberOfSessionsBeingPrepared() > 0)) {
Thread.sleep(1L);
}
// Simulate that the database has been deleted.
mockSpanner.setStickyGlobalExceptions(true);
mockSpanner.addException(Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException());

// All subsequent calls should fail with a DatabaseNotFoundException.
try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) {
while(rs.next()) {
}
fail("missing expected exception");
} catch (DatabaseNotFoundException e) {
}
try {
dbClient.readWriteTransaction().run(new TransactionCallable<Void>(){
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
fail("missing expected exception");
} catch (DatabaseNotFoundException e) {
}

// Now simulate that the database has been re-created. The database client should still throw DatabaseNotFoundExceptions, as it is not the same database.
mockSpanner.reset();
// All subsequent calls should fail with a DatabaseNotFoundException.
try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) {
while(rs.next()) {
}
// fail("missing expected exception");
// } catch (DatabaseNotFoundException e) {
}
// try {
dbClient.readWriteTransaction().run(new TransactionCallable<Void>(){
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
// fail("missing expected exception");
// } catch (DatabaseNotFoundException e) {
// }
}
}

@Test
public void testAllowNestedTransactions() throws InterruptedException {
final DatabaseClientImpl client =
Expand Down
Expand Up @@ -417,10 +417,10 @@ private SimulatedExecutionTime(
}

private void simulateExecutionTime(
Queue<Exception> globalExceptions, ReadWriteLock freezeLock) {
Queue<Exception> globalExceptions, boolean stickyGlobalExceptions, ReadWriteLock freezeLock) {
try {
freezeLock.readLock().lock();
checkException(globalExceptions, false);
checkException(globalExceptions, stickyGlobalExceptions);
checkException(this.exceptions, stickyException);
if (minimumExecutionTime > 0 || randomExecutionTime > 0) {
Uninterruptibles.sleepUninterruptibly(
Expand Down Expand Up @@ -449,6 +449,7 @@ private static void checkException(Queue<Exception> exceptions, boolean keepExce

private final ReadWriteLock freezeLock = new ReentrantReadWriteLock();
private final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
private boolean stickyGlobalExceptions = false;
private final ConcurrentMap<Statement, StatementResult> statementResults =
new ConcurrentHashMap<>();
private final ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -613,7 +614,7 @@ public void batchCreateSessions(
.withDescription("Session count must be >= 0")
.asRuntimeException();
}
batchCreateSessionsExecutionTime.simulateExecutionTime(exceptions, freezeLock);
batchCreateSessionsExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
if (sessions.size() >= maxTotalSessions) {
throw Status.RESOURCE_EXHAUSTED
.withDescription("Maximum number of sessions reached")
Expand Down Expand Up @@ -667,7 +668,7 @@ public void createSession(
Preconditions.checkNotNull(request.getDatabase());
String name = generateSessionName(request.getDatabase());
try {
createSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock);
createSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
Timestamp now = getCurrentGoogleTimestamp();
Session session =
Session.newBuilder()
Expand Down Expand Up @@ -700,7 +701,7 @@ public void createSession(
public void getSession(GetSessionRequest request, StreamObserver<Session> responseObserver) {
Preconditions.checkNotNull(request.getName());
try {
getSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock);
getSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
Session session = sessions.get(request.getName());
if (session == null) {
setSessionNotFound(request.getName(), responseObserver);
Expand Down Expand Up @@ -728,7 +729,7 @@ private <T> void setSessionNotFound(String name, StreamObserver<T> responseObser
public void listSessions(
ListSessionsRequest request, StreamObserver<ListSessionsResponse> responseObserver) {
try {
listSessionsExecutionTime.simulateExecutionTime(exceptions, freezeLock);
listSessionsExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
List<Session> res = new ArrayList<>();
for (Session session : sessions.values()) {
if (session.getName().startsWith(request.getDatabase())) {
Expand Down Expand Up @@ -757,7 +758,7 @@ public int compare(Session o1, Session o2) {
public void deleteSession(DeleteSessionRequest request, StreamObserver<Empty> responseObserver) {
Preconditions.checkNotNull(request.getName());
try {
deleteSessionExecutionTime.simulateExecutionTime(exceptions, freezeLock);
deleteSessionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
Session session = sessions.get(request.getName());
if (session != null) {
try {
Expand Down Expand Up @@ -790,7 +791,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
executeSqlExecutionTime.simulateExecutionTime(exceptions, freezeLock);
executeSqlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
ByteString transactionId = getTransactionId(session, request.getTransaction());
simulateAbort(session, transactionId);
Statement statement =
Expand Down Expand Up @@ -877,7 +878,7 @@ public void executeBatchDml(
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
executeBatchDmlExecutionTime.simulateExecutionTime(exceptions, freezeLock);
executeBatchDmlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Get or start transaction
ByteString transactionId = getTransactionId(session, request.getTransaction());
if (isPartitionedDmlTransaction(transactionId)) {
Expand Down Expand Up @@ -961,7 +962,7 @@ public void executeStreamingSql(
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
executeStreamingSqlExecutionTime.simulateExecutionTime(exceptions, freezeLock);
executeStreamingSqlExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Get or start transaction
ByteString transactionId = getTransactionId(session, request.getTransaction());
if (!request.getPartitionToken().isEmpty()) {
Expand Down Expand Up @@ -1140,7 +1141,7 @@ public void read(final ReadRequest request, StreamObserver<ResultSet> responseOb
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
readExecutionTime.simulateExecutionTime(exceptions, freezeLock);
readExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Get or start transaction
ByteString transactionId = getTransactionId(session, request.getTransaction());
simulateAbort(session, transactionId);
Expand Down Expand Up @@ -1178,7 +1179,7 @@ public void streamingRead(
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
streamingReadExecutionTime.simulateExecutionTime(exceptions, freezeLock);
streamingReadExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Get or start transaction
ByteString transactionId = getTransactionId(session, request.getTransaction());
if (!request.getPartitionToken().isEmpty()) {
Expand Down Expand Up @@ -1345,7 +1346,7 @@ public void beginTransaction(
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
beginTransactionExecutionTime.simulateExecutionTime(exceptions, freezeLock);
beginTransactionExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
Transaction transaction = beginTransaction(session, request.getOptions());
responseObserver.onNext(transaction);
responseObserver.onCompleted();
Expand Down Expand Up @@ -1449,7 +1450,7 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
commitExecutionTime.simulateExecutionTime(exceptions, freezeLock);
commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Find or start a transaction
Transaction transaction;
if (request.hasSingleUseTransaction()) {
Expand Down Expand Up @@ -1502,7 +1503,7 @@ public void rollback(RollbackRequest request, StreamObserver<Empty> responseObse
}
sessionLastUsed.put(session.getName(), Instant.now());
try {
rollbackExecutionTime.simulateExecutionTime(exceptions, freezeLock);
rollbackExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
Transaction transaction = transactions.get(request.getTransactionId());
if (transaction != null) {
rollbackTransaction(transaction.getId());
Expand Down Expand Up @@ -1533,7 +1534,7 @@ void markAbortedTransaction(ByteString transactionId) {
public void partitionQuery(
PartitionQueryRequest request, StreamObserver<PartitionResponse> responseObserver) {
try {
partitionQueryExecutionTime.simulateExecutionTime(exceptions, freezeLock);
partitionQueryExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
partition(request.getSession(), request.getTransaction(), responseObserver);
} catch (StatusRuntimeException t) {
responseObserver.onError(t);
Expand All @@ -1546,7 +1547,7 @@ public void partitionQuery(
public void partitionRead(
PartitionReadRequest request, StreamObserver<PartitionResponse> responseObserver) {
try {
partitionReadExecutionTime.simulateExecutionTime(exceptions, freezeLock);
partitionReadExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
partition(request.getSession(), request.getTransaction(), responseObserver);
} catch (StatusRuntimeException t) {
responseObserver.onError(t);
Expand Down Expand Up @@ -1597,6 +1598,10 @@ public void addException(Exception exception) {
exceptions.add(exception);
}

public void setStickyGlobalExceptions(boolean sticky) {
this.stickyGlobalExceptions = sticky;
}

@Override
public ServerServiceDefinition getServiceDefinition() {
return bindService();
Expand All @@ -1614,6 +1619,7 @@ public void reset() {
partitionTokens.clear();
transactionLastUsed.clear();
exceptions.clear();
stickyGlobalExceptions = false;
}

public void removeAllExecutionTimes() {
Expand Down

0 comments on commit 20965ad

Please sign in to comment.