Skip to content

Commit

Permalink
fix: stop sending RPCs to deleted database (#34)
Browse files Browse the repository at this point in the history
* fix: stop sending rpcs on deleted db

* fix: client should stop sending rpcs after database dropped

DatabaseClients should not continue to try to send RPCs to a database that has
been deleted. Instead, the session pool will keep track of whether a database
not found error has been returned for a database, and if so, will invalidate
itself. All subsequent calls for this database will return a DatabaseNotFoundException
without calling a RPC.

If a database is re-created, the user must create a new DatabaseClient with a new
session pool in order to resume usage of the database.

Fixes #16

* fix: remove double check on isValid

* fix: add wait to deleted db integration test

* fix: process review comments

* fix: update copyright year
  • Loading branch information
olavloite committed Jan 22, 2020
1 parent 384ddb4 commit 11e4a90
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 55 deletions.
@@ -0,0 +1,34 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import javax.annotation.Nullable;

/**
* Exception thrown by Cloud Spanner when an operation detects that the database that is being used
* no longer exists. This type of error has its own subclass as it is a condition that should cause
* the client library to stop trying to send RPCs to the backend until the user has taken action.
*/
public class DatabaseNotFoundException extends SpannerException {
private static final long serialVersionUID = -6395746612598975751L;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
DatabaseNotFoundException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
super(token, ErrorCode.NOT_FOUND, false, message, cause);
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -775,6 +776,15 @@ public void close() {
if (lastException != null && isSessionNotFound(lastException)) {
invalidateSession(this);
} else {
if (lastException != null && isDatabaseNotFound(lastException)) {
// Mark this session pool as no longer valid and then release the session into the pool as
// there is nothing we can do with it anyways.
synchronized (lock) {
SessionPool.this.databaseNotFound =
MoreObjects.firstNonNull(
SessionPool.this.databaseNotFound, (DatabaseNotFoundException) lastException);
}
}
lastException = null;
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
Expand Down Expand Up @@ -1056,6 +1066,9 @@ private static enum Position {
@GuardedBy("lock")
private SettableFuture<Void> closureFuture;

@GuardedBy("lock")
private DatabaseNotFoundException databaseNotFound;

@GuardedBy("lock")
private final LinkedList<PooledSession> readSessions = new LinkedList<>();

Expand Down Expand Up @@ -1193,7 +1206,7 @@ private boolean isSessionNotFound(SpannerException e) {
}

private boolean isDatabaseNotFound(SpannerException e) {
return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found");
return e instanceof DatabaseNotFoundException;
}

private boolean isPermissionDenied(SpannerException e) {
Expand Down Expand Up @@ -1225,6 +1238,13 @@ private PooledSession findSessionToKeepAlive(
return null;
}

/** @return true if this {@link SessionPool} is still valid. */
boolean isValid() {
synchronized (lock) {
return closureFuture == null && databaseNotFound == null;
}
}

/**
* Returns a session to be used for read requests to spanner. It will block if a session is not
* currently available. In case the pool is exhausted and {@link
Expand All @@ -1251,6 +1271,15 @@ PooledSession getReadSession() throws SpannerException {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
if (databaseNotFound != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
databaseNotFound.getMessage()),
databaseNotFound);
}
sess = readSessions.poll();
if (sess == null) {
sess = writePreparedSessions.poll();
Expand Down Expand Up @@ -1304,8 +1333,18 @@ PooledSession getReadWriteSession() {
PooledSession sess = null;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
if (databaseNotFound != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
databaseNotFound.getMessage()),
databaseNotFound);
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
Expand Down Expand Up @@ -1448,6 +1487,9 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
break;
}
}
this.databaseNotFound =
MoreObjects.firstNonNull(
this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
}
}

Expand All @@ -1470,6 +1512,10 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
if (isClosed()) {
decrementPendingClosures(1);
}
this.databaseNotFound =
MoreObjects.firstNonNull(
this.databaseNotFound,
isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
} else if (readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;

Expand All @@ -36,6 +37,14 @@
* ErrorCode#ABORTED} are always represented by {@link AbortedException}.
*/
public final class SpannerExceptionFactory {
static final String DATABASE_NOT_FOUND_MSG =
"Database not found: projects/.*/instances/.*/databases/.*\n"
+ "resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\"\n"
+ "resource_name: \"projects/.*/instances/.*/databases/.*\"\n"
+ "description: \"Database does not exist.\"\n";
private static final Pattern DATABASE_NOT_FOUND_MSG_PATTERN =
Pattern.compile(".*" + DATABASE_NOT_FOUND_MSG + ".*");

public static SpannerException newSpannerException(ErrorCode code, @Nullable String message) {
return newSpannerException(code, message, null);
}
Expand Down Expand Up @@ -176,6 +185,8 @@ private static SpannerException newSpannerExceptionPreformatted(
case NOT_FOUND:
if (message != null && message.contains("Session not found")) {
return new SessionNotFoundException(token, message, cause);
} else if (message != null && DATABASE_NOT_FOUND_MSG_PATTERN.matcher(message).matches()) {
return new DatabaseNotFoundException(token, message, cause);
}
// Fall through to the default.
default:
Expand Down
Expand Up @@ -88,6 +88,9 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
@GuardedBy("this")
private final Map<DatabaseId, DatabaseClientImpl> dbClients = new HashMap<>();

@GuardedBy("this")
private final List<DatabaseClientImpl> invalidatedDbClients = new ArrayList<>();

@GuardedBy("this")
private final Map<DatabaseId, SessionClient> sessionClients = new HashMap<>();

Expand Down Expand Up @@ -210,6 +213,12 @@ public InstanceAdminClient getInstanceAdminClient() {
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) {
// Move the invalidated client to a separate list, so we can close it together with the
// other database clients when the Spanner instance is closed.
invalidatedDbClients.add(dbClients.get(db));
dbClients.remove(db);
}
if (dbClients.containsKey(db)) {
return dbClients.get(db);
} else {
Expand Down Expand Up @@ -239,7 +248,8 @@ public void close() {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
spannerIsClosed = true;
closureFutures = new ArrayList<>();
for (DatabaseClientImpl dbClient : dbClients.values()) {
invalidatedDbClients.addAll(dbClients.values());
for (DatabaseClientImpl dbClient : invalidatedDbClients) {
closureFutures.add(dbClient.closeAsync());
}
dbClients.clear();
Expand Down

0 comments on commit 11e4a90

Please sign in to comment.