Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop sending RPCs to deleted database #34

Merged
merged 6 commits into from Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,34 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import javax.annotation.Nullable;

/**
* Exception thrown by Cloud Spanner when an operation detects that the database that is being used
* no longer exists. This type of error has its own subclass as it is a condition that should cause
* the client library to stop trying to send RPCs to the backend until the user has taken action.
*/
public class DatabaseNotFoundException extends SpannerException {
private static final long serialVersionUID = -6395746612598975751L;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
DatabaseNotFoundException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
super(token, ErrorCode.NOT_FOUND, false, message, cause);
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -775,6 +776,15 @@ public void close() {
if (lastException != null && isSessionNotFound(lastException)) {
invalidateSession(this);
} else {
if (lastException != null && isDatabaseNotFound(lastException)) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
// Mark this session pool as no longer valid and then release the session into the pool as
// there is nothing we can do with it anyways.
synchronized (lock) {
SessionPool.this.databaseNotFound =
MoreObjects.firstNonNull(
SessionPool.this.databaseNotFound, (DatabaseNotFoundException) lastException);
}
}
lastException = null;
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
Expand Down Expand Up @@ -1056,6 +1066,9 @@ private static enum Position {
@GuardedBy("lock")
private SettableFuture<Void> closureFuture;

@GuardedBy("lock")
private DatabaseNotFoundException databaseNotFound;

@GuardedBy("lock")
private final LinkedList<PooledSession> readSessions = new LinkedList<>();

Expand Down Expand Up @@ -1193,7 +1206,7 @@ private boolean isSessionNotFound(SpannerException e) {
}

private boolean isDatabaseNotFound(SpannerException e) {
return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found");
return e instanceof DatabaseNotFoundException;
}

private boolean isPermissionDenied(SpannerException e) {
Expand Down Expand Up @@ -1225,6 +1238,13 @@ private PooledSession findSessionToKeepAlive(
return null;
}

/** @return true if this {@link SessionPool} is still valid. */
boolean isValid() {
synchronized (lock) {
return closureFuture == null && databaseNotFound == null;
}
}

/**
* Returns a session to be used for read requests to spanner. It will block if a session is not
* currently available. In case the pool is exhausted and {@link
Expand All @@ -1251,6 +1271,15 @@ PooledSession getReadSession() throws SpannerException {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
if (databaseNotFound != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
databaseNotFound.getMessage()),
databaseNotFound);
}
sess = readSessions.poll();
if (sess == null) {
sess = writePreparedSessions.poll();
Expand Down Expand Up @@ -1304,8 +1333,18 @@ PooledSession getReadWriteSession() {
PooledSession sess = null;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
if (databaseNotFound != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
databaseNotFound.getMessage()),
databaseNotFound);
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
Expand Down Expand Up @@ -1448,6 +1487,9 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
break;
}
}
this.databaseNotFound =
MoreObjects.firstNonNull(
this.databaseNotFound, isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
}
}

Expand All @@ -1470,6 +1512,10 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
if (isClosed()) {
decrementPendingClosures(1);
}
this.databaseNotFound =
MoreObjects.firstNonNull(
this.databaseNotFound,
isDatabaseNotFound(e) ? (DatabaseNotFoundException) e : null);
} else if (readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;

Expand All @@ -36,6 +37,14 @@
* ErrorCode#ABORTED} are always represented by {@link AbortedException}.
*/
public final class SpannerExceptionFactory {
static final String DATABASE_NOT_FOUND_MSG =
"Database not found: projects/.*/instances/.*/databases/.*\n"
+ "resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\"\n"
+ "resource_name: \"projects/.*/instances/.*/databases/.*\"\n"
+ "description: \"Database does not exist.\"\n";
private static final Pattern DATABASE_NOT_FOUND_MSG_PATTERN =
Pattern.compile(".*" + DATABASE_NOT_FOUND_MSG + ".*");

public static SpannerException newSpannerException(ErrorCode code, @Nullable String message) {
return newSpannerException(code, message, null);
}
Expand Down Expand Up @@ -176,6 +185,8 @@ private static SpannerException newSpannerExceptionPreformatted(
case NOT_FOUND:
if (message != null && message.contains("Session not found")) {
return new SessionNotFoundException(token, message, cause);
} else if (message != null && DATABASE_NOT_FOUND_MSG_PATTERN.matcher(message).matches()) {
hengfengli marked this conversation as resolved.
Show resolved Hide resolved
return new DatabaseNotFoundException(token, message, cause);
}
// Fall through to the default.
default:
Expand Down
Expand Up @@ -88,6 +88,9 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
@GuardedBy("this")
private final Map<DatabaseId, DatabaseClientImpl> dbClients = new HashMap<>();

@GuardedBy("this")
private final List<DatabaseClientImpl> invalidatedDbClients = new ArrayList<>();

@GuardedBy("this")
private final Map<DatabaseId, SessionClient> sessionClients = new HashMap<>();

Expand Down Expand Up @@ -210,6 +213,12 @@ public InstanceAdminClient getInstanceAdminClient() {
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) {
// Move the invalidated client to a separate list, so we can close it together with the
// other database clients when the Spanner instance is closed.
invalidatedDbClients.add(dbClients.get(db));
dbClients.remove(db);
}
if (dbClients.containsKey(db)) {
return dbClients.get(db);
} else {
Expand Down Expand Up @@ -239,7 +248,8 @@ public void close() {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
spannerIsClosed = true;
closureFutures = new ArrayList<>();
for (DatabaseClientImpl dbClient : dbClients.values()) {
invalidatedDbClients.addAll(dbClients.values());
for (DatabaseClientImpl dbClient : invalidatedDbClients) {
closureFutures.add(dbClient.closeAsync());
}
dbClients.clear();
Expand Down