Skip to content

Commit

Permalink
fix: client should stop sending rpcs after database dropped
Browse files Browse the repository at this point in the history
DatabaseClients should not continue to try to send RPCs to a database that has
been deleted. Instead, the session pool will keep track of whether a database
not found error has been returned for a database, and if so, will invalidate
itself. All subsequent calls for this database will return a DatabaseNotFoundException
without calling a RPC.

If a database is re-created, the user must create a new DatabaseClient with a new
session pool in order to resume usage of the database.

Fixes #16
  • Loading branch information
olavloite committed Jan 14, 2020
1 parent 5480628 commit ae2ad8c
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 85 deletions.
Expand Up @@ -20,7 +20,8 @@

/**
* Exception thrown by Cloud Spanner when an operation detects that the database that is being used
* no longer exists. This type of error has its own subclass as it is a condition that should cause the client library to stop trying to send RPCs to the backend until the user has taken action.
* no longer exists. This type of error has its own subclass as it is a condition that should cause
* the client library to stop trying to send RPCs to the backend until the user has taken action.
*/
public class DatabaseNotFoundException extends SpannerException {
private static final long serialVersionUID = -6395746612598975751L;
Expand Down
Expand Up @@ -776,6 +776,15 @@ public void close() {
if (lastException != null && isSessionNotFound(lastException)) {
invalidateSession(this);
} else {
if (lastException != null && isDatabaseNotFound(lastException)) {
// Mark this session pool as no longer valid and then release the session into the pool as
// there is nothing we can do with it anyways.
synchronized (lock) {
SessionPool.this.databaseNotFound =
MoreObjects.firstNonNull(
SessionPool.this.databaseNotFound, (DatabaseNotFoundException) lastException);
}
}
lastException = null;
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
Expand Down Expand Up @@ -1057,8 +1066,8 @@ private static enum Position {
@GuardedBy("lock")
private SettableFuture<Void> closureFuture;

// @GuardedBy("lock")
// private DatabaseNotFoundException databaseNotFound;
@GuardedBy("lock")
private DatabaseNotFoundException databaseNotFound;

@GuardedBy("lock")
private final LinkedList<PooledSession> readSessions = new LinkedList<>();
Expand Down Expand Up @@ -1229,6 +1238,13 @@ private PooledSession findSessionToKeepAlive(
return null;
}

/** @return true if this {@link SessionPool} is still valid. */
boolean isValid() {
synchronized (lock) {
return closureFuture == null && databaseNotFound == null;
}
}

/**
* Returns a session to be used for read requests to spanner. It will block if a session is not
* currently available. In case the pool is exhausted and {@link
Expand All @@ -1255,10 +1271,15 @@ PooledSession getReadSession() throws SpannerException {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
// if (databaseNotFound != null) {
// span.addAnnotation("Database has been deleted");
// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound);
// }
if (databaseNotFound != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
databaseNotFound.getMessage()),
databaseNotFound);
}
sess = readSessions.poll();
if (sess == null) {
sess = writePreparedSessions.poll();
Expand Down Expand Up @@ -1315,10 +1336,15 @@ PooledSession getReadWriteSession() {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
// if (databaseNotFound != null) {
// span.addAnnotation("Database has been deleted");
// throw SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "The session pool has been invalidated because a previous RPC returned 'Database not found'.", databaseNotFound);
// }
if (databaseNotFound != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
databaseNotFound.getMessage()),
databaseNotFound);
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
Expand Down Expand Up @@ -1461,7 +1487,9 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
break;
}
}
// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
this.databaseNotFound =
MoreObjects.firstNonNull(
this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
}
}

Expand All @@ -1484,7 +1512,10 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
if (isClosed()) {
decrementPendingClosures(1);
}
// this.databaseNotFound = MoreObjects.firstNonNull(this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
this.databaseNotFound =
MoreObjects.firstNonNull(
this.databaseNotFound,
isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
} else if (readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;

Expand All @@ -36,6 +37,11 @@
* ErrorCode#ABORTED} are always represented by {@link AbortedException}.
*/
public final class SpannerExceptionFactory {
static final String DATABASE_NOT_FOUND_MSG =
"Database not found: projects/.*/instances/.*/databases/.*\nresource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\"\nresource_name: \"projects/.*/instances/.*/databases/.*\"\ndescription: \"Database does not exist.\"\n";
private static final Pattern DATABASE_NOT_FOUND_MSG_PATTERN =
Pattern.compile(".*" + DATABASE_NOT_FOUND_MSG + ".*");

public static SpannerException newSpannerException(ErrorCode code, @Nullable String message) {
return newSpannerException(code, message, null);
}
Expand Down Expand Up @@ -176,7 +182,7 @@ private static SpannerException newSpannerExceptionPreformatted(
case NOT_FOUND:
if (message != null && message.contains("Session not found")) {
return new SessionNotFoundException(token, message, cause);
} else if (message != null && message.contains("Database not found")) {
} else if (message != null && DATABASE_NOT_FOUND_MSG_PATTERN.matcher(message).matches()) {
return new DatabaseNotFoundException(token, message, cause);
}
// Fall through to the default.
Expand Down
Expand Up @@ -88,6 +88,9 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
@GuardedBy("this")
private final Map<DatabaseId, DatabaseClientImpl> dbClients = new HashMap<>();

@GuardedBy("this")
private final List<DatabaseClientImpl> invalidatedDbClients = new ArrayList<>();

@GuardedBy("this")
private final Map<DatabaseId, SessionClient> sessionClients = new HashMap<>();

Expand Down Expand Up @@ -210,7 +213,13 @@ public InstanceAdminClient getInstanceAdminClient() {
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
if (dbClients.containsKey(db)) {
if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) {
// Move the invalidated client to a separate list, so we can close it together with the
// other database clients when the Spanner instance is closed.
invalidatedDbClients.add(dbClients.get(db));
dbClients.remove(db);
}
if (dbClients.containsKey(db) && dbClients.get(db).pool.isValid()) {
return dbClients.get(db);
} else {
SessionPool pool =
Expand Down Expand Up @@ -239,7 +248,8 @@ public void close() {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
spannerIsClosed = true;
closureFutures = new ArrayList<>();
for (DatabaseClientImpl dbClient : dbClients.values()) {
invalidatedDbClients.addAll(dbClients.values());
for (DatabaseClientImpl dbClient : invalidatedDbClients) {
closureFutures.add(dbClient.closeAsync());
}
dbClients.clear();
Expand Down

0 comments on commit ae2ad8c

Please sign in to comment.