Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop preparing session on most errors #181

Merged
merged 1 commit into from Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -49,6 +49,7 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -102,6 +103,17 @@ final class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
Expand Down Expand Up @@ -1114,6 +1126,9 @@ private static enum Position {
@GuardedBy("lock")
private ResourceNotFoundException resourceNotFoundException;

@GuardedBy("lock")
private boolean stopAutomaticPrepare;

@GuardedBy("lock")
private final LinkedList<PooledSession> readSessions = new LinkedList<>();

Expand Down Expand Up @@ -1348,8 +1363,9 @@ private boolean isDatabaseOrInstanceNotFound(SpannerException e) {
return e instanceof DatabaseNotFoundException || e instanceof InstanceNotFoundException;
}

private boolean isPermissionDenied(SpannerException e) {
return e.getErrorCode() == ErrorCode.PERMISSION_DENIED;
private boolean shouldStopPrepareSessions(SpannerException e) {
return isDatabaseOrInstanceNotFound(e)
|| SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES.contains(e.getErrorCode());
}

private void invalidateSession(PooledSession session) {
Expand Down Expand Up @@ -1477,7 +1493,7 @@ PooledSession getReadWriteSession() {
// session.
while (true) {
Waiter waiter = null;
boolean inProcessPrepare = false;
boolean inProcessPrepare = stopAutomaticPrepare;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
Expand All @@ -1494,7 +1510,7 @@ PooledSession getReadWriteSession() {
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= prepareThreadPoolSize) {
if (!inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
PooledSession readSession = readSessions.poll();
if (readSession != null) {
Expand Down Expand Up @@ -1550,12 +1566,16 @@ PooledSession getReadWriteSession() {
if (inProcessPrepare) {
try {
sess.prepareReadWriteTransaction();
// Session prepare succeeded, restart automatic prepare if it had been stopped.
synchronized (lock) {
stopAutomaticPrepare = false;
}
} catch (Throwable t) {
sess = null;
SpannerException e = newSpannerException(t);
if (!isClosed()) {
handlePrepareSessionFailure(e, sess, false);
}
sess = null;
if (!isSessionNotFound(e)) {
throw e;
}
Expand Down Expand Up @@ -1696,25 +1716,30 @@ private void handlePrepareSessionFailure(
synchronized (lock) {
if (isSessionNotFound(e)) {
invalidateSession(session);
} else if (isDatabaseOrInstanceNotFound(e) || isPermissionDenied(e)) {
// Database has been deleted or the user has no permission to write to this database. We
// should stop trying to prepare any transactions. Also propagate the error to all waiters,
// as any further waiting is pointless.
} else if (shouldStopPrepareSessions(e)) {
// Database has been deleted or the user has no permission to write to this database, or
// there is some other semi-permanent error. We should stop trying to prepare any
// transactions. Also propagate the error to all waiters if the database or instance has
// been deleted, as any further waiting is pointless.
stopAutomaticPrepare = true;
while (readWriteWaiters.size() > 0) {
readWriteWaiters.poll().put(e);
}
while (readWaiters.size() > 0) {
readWaiters.poll().put(e);
}
// Remove the session from the pool.
allSessions.remove(session);
if (isClosed()) {
decrementPendingClosures(1);
if (isDatabaseOrInstanceNotFound(e)) {
// Remove the session from the pool.
if (isClosed()) {
decrementPendingClosures(1);
}
allSessions.remove(session);
this.resourceNotFoundException =
MoreObjects.firstNonNull(
this.resourceNotFoundException, (ResourceNotFoundException) e);
Comment on lines +1732 to +1739
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous logic, we executed this logic also for PERMISSION_DENIED errors. It doesn't anymore. Is this a problem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is still executed for PERMISSION_DENIED errors, but that is now covered by the general error handling that also handles other types of errors. The specific errors DatabaseNotFound and InstanceNotFound are handled specifically, as these should not allow the normal operation of the session pool to be restarted once the error does no longer occur. The reasoning behind that is that a DatabaseNotFound exception can only be fixed by creating a new database with the same name as the old one, but that is still a new database.
The other errors, such as PERMISSION_DENIED and now also for examle FAILED_PRECONDITION, should allow the session pool to restart the automatic preparing of sessions once the error has been fixed. This could be granting the user the necessary permissions or changing the state of database to fulfill all requirements.

} else {
releaseSession(session, Position.FIRST);
}
this.resourceNotFoundException =
MoreObjects.firstNonNull(
this.resourceNotFoundException,
isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null);
} else if (informFirstWaiter && readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down Expand Up @@ -1809,6 +1834,9 @@ private boolean shouldUnblockReader() {

private boolean shouldPrepareSession() {
synchronized (lock) {
if (stopAutomaticPrepare) {
return false;
}
int preparedSessions = writePreparedSessions.size() + numSessionsBeingPrepared;
return preparedSessions < Math.floor(options.getWriteSessionsFraction() * totalSessions());
}
Expand Down
Expand Up @@ -469,12 +469,25 @@ public void testDatabaseOrInstanceDoesNotExistOnReplenish() throws Exception {

@Test
public void testPermissionDeniedOnPrepareSession() throws Exception {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case covers the PERMISSION_DENIED case discussed above.

testExceptionOnPrepareSession(
Status.PERMISSION_DENIED
.withDescription(
"Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource")
.asRuntimeException());
}

@Test
public void testFailedPreconditionOnPrepareSession() throws Exception {
testExceptionOnPrepareSession(
Status.FAILED_PRECONDITION
.withDescription("FAILED_PRECONDITION: Database is in read-only mode")
.asRuntimeException());
}

private void testExceptionOnPrepareSession(StatusRuntimeException exception)
throws InterruptedException {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.PERMISSION_DENIED
.withDescription(
"Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource")
.asRuntimeException()));
SimulatedExecutionTime.ofStickyException(exception));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
Expand Down Expand Up @@ -503,10 +516,39 @@ public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
fail("missing expected PERMISSION_DENIED exception");
fail(String.format("missing expected %s exception", exception.getStatus().getCode().name()));
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.PERMISSION_DENIED)));
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.fromGrpcStatus(exception.getStatus()))));
}
// Remove the semi-permanent error condition. Getting a read/write transaction should now
// succeed, and the automatic preparing of sessions should be restarted.
mockSpanner.setBeginTransactionExecutionTime(SimulatedExecutionTime.none());
dbClient
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
for (int i = 0; i < spanner.getOptions().getSessionPoolOptions().getMinSessions(); i++) {
dbClient.pool.getReadSession().close();
}
int expectedPreparedSessions =
(int)
Math.ceil(
dbClient.pool.getNumberOfSessionsInPool()
* spanner.getOptions().getSessionPoolOptions().getWriteSessionsFraction());
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfAvailableWritePreparedSessions() < expectedPreparedSessions) {
Thread.sleep(1L);
}
assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0)));
assertThat(
dbClient.pool.getNumberOfAvailableWritePreparedSessions(),
is(equalTo(expectedPreparedSessions)));
}

/**
Expand Down