Skip to content

Commit

Permalink
fix: stop preparing session on most errors (#181)
Browse files Browse the repository at this point in the history
Most errors that occur during preparing a session for a read/write transaction should be considered permanent, and should stop the automatic preparing of sessions. Any subsequent call to get a read/write session will cause an in-process BeginTransaction RPC to be executed. If
the problem has been fixed in the meantime, the RPC will succeed and the automatic prepare of sessions will start again. Otherwise, the error is propagated to the user.

Fixes #177
  • Loading branch information
olavloite committed Apr 28, 2020
1 parent d80428a commit d0e3d41
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 24 deletions.
Expand Up @@ -49,6 +49,7 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -102,6 +103,17 @@ final class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
Expand Down Expand Up @@ -1114,6 +1126,9 @@ private static enum Position {
@GuardedBy("lock")
private ResourceNotFoundException resourceNotFoundException;

@GuardedBy("lock")
private boolean stopAutomaticPrepare;

@GuardedBy("lock")
private final LinkedList<PooledSession> readSessions = new LinkedList<>();

Expand Down Expand Up @@ -1348,8 +1363,9 @@ private boolean isDatabaseOrInstanceNotFound(SpannerException e) {
return e instanceof DatabaseNotFoundException || e instanceof InstanceNotFoundException;
}

private boolean isPermissionDenied(SpannerException e) {
return e.getErrorCode() == ErrorCode.PERMISSION_DENIED;
private boolean shouldStopPrepareSessions(SpannerException e) {
return isDatabaseOrInstanceNotFound(e)
|| SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES.contains(e.getErrorCode());
}

private void invalidateSession(PooledSession session) {
Expand Down Expand Up @@ -1477,7 +1493,7 @@ PooledSession getReadWriteSession() {
// session.
while (true) {
Waiter waiter = null;
boolean inProcessPrepare = false;
boolean inProcessPrepare = stopAutomaticPrepare;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
Expand All @@ -1494,7 +1510,7 @@ PooledSession getReadWriteSession() {
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= prepareThreadPoolSize) {
if (!inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
PooledSession readSession = readSessions.poll();
if (readSession != null) {
Expand Down Expand Up @@ -1550,12 +1566,16 @@ PooledSession getReadWriteSession() {
if (inProcessPrepare) {
try {
sess.prepareReadWriteTransaction();
// Session prepare succeeded, restart automatic prepare if it had been stopped.
synchronized (lock) {
stopAutomaticPrepare = false;
}
} catch (Throwable t) {
sess = null;
SpannerException e = newSpannerException(t);
if (!isClosed()) {
handlePrepareSessionFailure(e, sess, false);
}
sess = null;
if (!isSessionNotFound(e)) {
throw e;
}
Expand Down Expand Up @@ -1696,25 +1716,30 @@ private void handlePrepareSessionFailure(
synchronized (lock) {
if (isSessionNotFound(e)) {
invalidateSession(session);
} else if (isDatabaseOrInstanceNotFound(e) || isPermissionDenied(e)) {
// Database has been deleted or the user has no permission to write to this database. We
// should stop trying to prepare any transactions. Also propagate the error to all waiters,
// as any further waiting is pointless.
} else if (shouldStopPrepareSessions(e)) {
// Database has been deleted or the user has no permission to write to this database, or
// there is some other semi-permanent error. We should stop trying to prepare any
// transactions. Also propagate the error to all waiters if the database or instance has
// been deleted, as any further waiting is pointless.
stopAutomaticPrepare = true;
while (readWriteWaiters.size() > 0) {
readWriteWaiters.poll().put(e);
}
while (readWaiters.size() > 0) {
readWaiters.poll().put(e);
}
// Remove the session from the pool.
allSessions.remove(session);
if (isClosed()) {
decrementPendingClosures(1);
if (isDatabaseOrInstanceNotFound(e)) {
// Remove the session from the pool.
if (isClosed()) {
decrementPendingClosures(1);
}
allSessions.remove(session);
this.resourceNotFoundException =
MoreObjects.firstNonNull(
this.resourceNotFoundException, (ResourceNotFoundException) e);
} else {
releaseSession(session, Position.FIRST);
}
this.resourceNotFoundException =
MoreObjects.firstNonNull(
this.resourceNotFoundException,
isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null);
} else if (informFirstWaiter && readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down Expand Up @@ -1809,6 +1834,9 @@ private boolean shouldUnblockReader() {

private boolean shouldPrepareSession() {
synchronized (lock) {
if (stopAutomaticPrepare) {
return false;
}
int preparedSessions = writePreparedSessions.size() + numSessionsBeingPrepared;
return preparedSessions < Math.floor(options.getWriteSessionsFraction() * totalSessions());
}
Expand Down
Expand Up @@ -469,12 +469,25 @@ public void testDatabaseOrInstanceDoesNotExistOnReplenish() throws Exception {

@Test
public void testPermissionDeniedOnPrepareSession() throws Exception {
testExceptionOnPrepareSession(
Status.PERMISSION_DENIED
.withDescription(
"Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource")
.asRuntimeException());
}

@Test
public void testFailedPreconditionOnPrepareSession() throws Exception {
testExceptionOnPrepareSession(
Status.FAILED_PRECONDITION
.withDescription("FAILED_PRECONDITION: Database is in read-only mode")
.asRuntimeException());
}

private void testExceptionOnPrepareSession(StatusRuntimeException exception)
throws InterruptedException {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.PERMISSION_DENIED
.withDescription(
"Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource")
.asRuntimeException()));
SimulatedExecutionTime.ofStickyException(exception));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
Expand Down Expand Up @@ -503,10 +516,39 @@ public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
fail("missing expected PERMISSION_DENIED exception");
fail(String.format("missing expected %s exception", exception.getStatus().getCode().name()));
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.PERMISSION_DENIED)));
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.fromGrpcStatus(exception.getStatus()))));
}
// Remove the semi-permanent error condition. Getting a read/write transaction should now
// succeed, and the automatic preparing of sessions should be restarted.
mockSpanner.setBeginTransactionExecutionTime(SimulatedExecutionTime.none());
dbClient
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
for (int i = 0; i < spanner.getOptions().getSessionPoolOptions().getMinSessions(); i++) {
dbClient.pool.getReadSession().close();
}
int expectedPreparedSessions =
(int)
Math.ceil(
dbClient.pool.getNumberOfSessionsInPool()
* spanner.getOptions().getSessionPoolOptions().getWriteSessionsFraction());
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfAvailableWritePreparedSessions() < expectedPreparedSessions) {
Thread.sleep(1L);
}
assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0)));
assertThat(
dbClient.pool.getNumberOfAvailableWritePreparedSessions(),
is(equalTo(expectedPreparedSessions)));
}

/**
Expand Down

0 comments on commit d0e3d41

Please sign in to comment.