diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 3b2a0d2da3..8f57b114db 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -49,6 +49,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -102,6 +103,17 @@ final class SessionPool { private static final Logger logger = Logger.getLogger(SessionPool.class.getName()); private static final Tracer tracer = Tracing.getTracer(); static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession"; + static final ImmutableSet SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES = + ImmutableSet.of( + ErrorCode.UNKNOWN, + ErrorCode.INVALID_ARGUMENT, + ErrorCode.PERMISSION_DENIED, + ErrorCode.UNAUTHENTICATED, + ErrorCode.RESOURCE_EXHAUSTED, + ErrorCode.FAILED_PRECONDITION, + ErrorCode.OUT_OF_RANGE, + ErrorCode.UNIMPLEMENTED, + ErrorCode.INTERNAL); /** * Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8 @@ -1114,6 +1126,9 @@ private static enum Position { @GuardedBy("lock") private ResourceNotFoundException resourceNotFoundException; + @GuardedBy("lock") + private boolean stopAutomaticPrepare; + @GuardedBy("lock") private final LinkedList readSessions = new LinkedList<>(); @@ -1348,8 +1363,9 @@ private boolean isDatabaseOrInstanceNotFound(SpannerException e) { return e instanceof DatabaseNotFoundException || e instanceof InstanceNotFoundException; } - private boolean isPermissionDenied(SpannerException e) { - return e.getErrorCode() == ErrorCode.PERMISSION_DENIED; + private boolean shouldStopPrepareSessions(SpannerException e) { + return isDatabaseOrInstanceNotFound(e) + || SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES.contains(e.getErrorCode()); } private void invalidateSession(PooledSession session) { @@ -1477,7 +1493,7 @@ PooledSession getReadWriteSession() { // session. while (true) { Waiter waiter = null; - boolean inProcessPrepare = false; + boolean inProcessPrepare = stopAutomaticPrepare; synchronized (lock) { if (closureFuture != null) { span.addAnnotation("Pool has been closed"); @@ -1494,7 +1510,7 @@ PooledSession getReadWriteSession() { } sess = writePreparedSessions.poll(); if (sess == null) { - if (numSessionsBeingPrepared <= prepareThreadPoolSize) { + if (!inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize) { if (numSessionsBeingPrepared <= readWriteWaiters.size()) { PooledSession readSession = readSessions.poll(); if (readSession != null) { @@ -1550,12 +1566,16 @@ PooledSession getReadWriteSession() { if (inProcessPrepare) { try { sess.prepareReadWriteTransaction(); + // Session prepare succeeded, restart automatic prepare if it had been stopped. + synchronized (lock) { + stopAutomaticPrepare = false; + } } catch (Throwable t) { - sess = null; SpannerException e = newSpannerException(t); if (!isClosed()) { handlePrepareSessionFailure(e, sess, false); } + sess = null; if (!isSessionNotFound(e)) { throw e; } @@ -1696,25 +1716,30 @@ private void handlePrepareSessionFailure( synchronized (lock) { if (isSessionNotFound(e)) { invalidateSession(session); - } else if (isDatabaseOrInstanceNotFound(e) || isPermissionDenied(e)) { - // Database has been deleted or the user has no permission to write to this database. We - // should stop trying to prepare any transactions. Also propagate the error to all waiters, - // as any further waiting is pointless. + } else if (shouldStopPrepareSessions(e)) { + // Database has been deleted or the user has no permission to write to this database, or + // there is some other semi-permanent error. We should stop trying to prepare any + // transactions. Also propagate the error to all waiters if the database or instance has + // been deleted, as any further waiting is pointless. + stopAutomaticPrepare = true; while (readWriteWaiters.size() > 0) { readWriteWaiters.poll().put(e); } while (readWaiters.size() > 0) { readWaiters.poll().put(e); } - // Remove the session from the pool. - allSessions.remove(session); - if (isClosed()) { - decrementPendingClosures(1); + if (isDatabaseOrInstanceNotFound(e)) { + // Remove the session from the pool. + if (isClosed()) { + decrementPendingClosures(1); + } + allSessions.remove(session); + this.resourceNotFoundException = + MoreObjects.firstNonNull( + this.resourceNotFoundException, (ResourceNotFoundException) e); + } else { + releaseSession(session, Position.FIRST); } - this.resourceNotFoundException = - MoreObjects.firstNonNull( - this.resourceNotFoundException, - isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null); } else if (informFirstWaiter && readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); @@ -1809,6 +1834,9 @@ private boolean shouldUnblockReader() { private boolean shouldPrepareSession() { synchronized (lock) { + if (stopAutomaticPrepare) { + return false; + } int preparedSessions = writePreparedSessions.size() + numSessionsBeingPrepared; return preparedSessions < Math.floor(options.getWriteSessionsFraction() * totalSessions()); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index f665d66ada..8bfdd305fa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -469,12 +469,25 @@ public void testDatabaseOrInstanceDoesNotExistOnReplenish() throws Exception { @Test public void testPermissionDeniedOnPrepareSession() throws Exception { + testExceptionOnPrepareSession( + Status.PERMISSION_DENIED + .withDescription( + "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource") + .asRuntimeException()); + } + + @Test + public void testFailedPreconditionOnPrepareSession() throws Exception { + testExceptionOnPrepareSession( + Status.FAILED_PRECONDITION + .withDescription("FAILED_PRECONDITION: Database is in read-only mode") + .asRuntimeException()); + } + + private void testExceptionOnPrepareSession(StatusRuntimeException exception) + throws InterruptedException { mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofStickyException( - Status.PERMISSION_DENIED - .withDescription( - "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource") - .asRuntimeException())); + SimulatedExecutionTime.ofStickyException(exception)); DatabaseClientImpl dbClient = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); @@ -503,10 +516,39 @@ public Void run(TransactionContext transaction) throws Exception { return null; } }); - fail("missing expected PERMISSION_DENIED exception"); + fail(String.format("missing expected %s exception", exception.getStatus().getCode().name())); } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.PERMISSION_DENIED))); + assertThat(e.getErrorCode(), is(equalTo(ErrorCode.fromGrpcStatus(exception.getStatus())))); + } + // Remove the semi-permanent error condition. Getting a read/write transaction should now + // succeed, and the automatic preparing of sessions should be restarted. + mockSpanner.setBeginTransactionExecutionTime(SimulatedExecutionTime.none()); + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + for (int i = 0; i < spanner.getOptions().getSessionPoolOptions().getMinSessions(); i++) { + dbClient.pool.getReadSession().close(); } + int expectedPreparedSessions = + (int) + Math.ceil( + dbClient.pool.getNumberOfSessionsInPool() + * spanner.getOptions().getSessionPoolOptions().getWriteSessionsFraction()); + watch = watch.reset().start(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfAvailableWritePreparedSessions() < expectedPreparedSessions) { + Thread.sleep(1L); + } + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); + assertThat( + dbClient.pool.getNumberOfAvailableWritePreparedSessions(), + is(equalTo(expectedPreparedSessions))); } /**