From d0e3d41131a7480baee787654b7b9591efae5069 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Tue, 28 Apr 2020 08:26:26 +0200 Subject: [PATCH] fix: stop preparing session on most errors (#181) Most errors that occur during preparing a session for a read/write transaction should be considered permanent, and should stop the automatic preparing of sessions. Any subsequent call to get a read/write session will cause an in-process BeginTransaction RPC to be executed. If the problem has been fixed in the meantime, the RPC will succeed and the automatic prepare of sessions will start again. Otherwise, the error is propagated to the user. Fixes #177 --- .../com/google/cloud/spanner/SessionPool.java | 62 ++++++++++++++----- .../cloud/spanner/DatabaseClientImplTest.java | 56 ++++++++++++++--- 2 files changed, 94 insertions(+), 24 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 3b2a0d2da3..8f57b114db 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -49,6 +49,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -102,6 +103,17 @@ final class SessionPool { private static final Logger logger = Logger.getLogger(SessionPool.class.getName()); private static final Tracer tracer = Tracing.getTracer(); static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession"; + static final ImmutableSet SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES = + ImmutableSet.of( + ErrorCode.UNKNOWN, + ErrorCode.INVALID_ARGUMENT, + ErrorCode.PERMISSION_DENIED, + ErrorCode.UNAUTHENTICATED, + ErrorCode.RESOURCE_EXHAUSTED, + ErrorCode.FAILED_PRECONDITION, + ErrorCode.OUT_OF_RANGE, + ErrorCode.UNIMPLEMENTED, + ErrorCode.INTERNAL); /** * Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8 @@ -1114,6 +1126,9 @@ private static enum Position { @GuardedBy("lock") private ResourceNotFoundException resourceNotFoundException; + @GuardedBy("lock") + private boolean stopAutomaticPrepare; + @GuardedBy("lock") private final LinkedList readSessions = new LinkedList<>(); @@ -1348,8 +1363,9 @@ private boolean isDatabaseOrInstanceNotFound(SpannerException e) { return e instanceof DatabaseNotFoundException || e instanceof InstanceNotFoundException; } - private boolean isPermissionDenied(SpannerException e) { - return e.getErrorCode() == ErrorCode.PERMISSION_DENIED; + private boolean shouldStopPrepareSessions(SpannerException e) { + return isDatabaseOrInstanceNotFound(e) + || SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES.contains(e.getErrorCode()); } private void invalidateSession(PooledSession session) { @@ -1477,7 +1493,7 @@ PooledSession getReadWriteSession() { // session. while (true) { Waiter waiter = null; - boolean inProcessPrepare = false; + boolean inProcessPrepare = stopAutomaticPrepare; synchronized (lock) { if (closureFuture != null) { span.addAnnotation("Pool has been closed"); @@ -1494,7 +1510,7 @@ PooledSession getReadWriteSession() { } sess = writePreparedSessions.poll(); if (sess == null) { - if (numSessionsBeingPrepared <= prepareThreadPoolSize) { + if (!inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize) { if (numSessionsBeingPrepared <= readWriteWaiters.size()) { PooledSession readSession = readSessions.poll(); if (readSession != null) { @@ -1550,12 +1566,16 @@ PooledSession getReadWriteSession() { if (inProcessPrepare) { try { sess.prepareReadWriteTransaction(); + // Session prepare succeeded, restart automatic prepare if it had been stopped. + synchronized (lock) { + stopAutomaticPrepare = false; + } } catch (Throwable t) { - sess = null; SpannerException e = newSpannerException(t); if (!isClosed()) { handlePrepareSessionFailure(e, sess, false); } + sess = null; if (!isSessionNotFound(e)) { throw e; } @@ -1696,25 +1716,30 @@ private void handlePrepareSessionFailure( synchronized (lock) { if (isSessionNotFound(e)) { invalidateSession(session); - } else if (isDatabaseOrInstanceNotFound(e) || isPermissionDenied(e)) { - // Database has been deleted or the user has no permission to write to this database. We - // should stop trying to prepare any transactions. Also propagate the error to all waiters, - // as any further waiting is pointless. + } else if (shouldStopPrepareSessions(e)) { + // Database has been deleted or the user has no permission to write to this database, or + // there is some other semi-permanent error. We should stop trying to prepare any + // transactions. Also propagate the error to all waiters if the database or instance has + // been deleted, as any further waiting is pointless. + stopAutomaticPrepare = true; while (readWriteWaiters.size() > 0) { readWriteWaiters.poll().put(e); } while (readWaiters.size() > 0) { readWaiters.poll().put(e); } - // Remove the session from the pool. - allSessions.remove(session); - if (isClosed()) { - decrementPendingClosures(1); + if (isDatabaseOrInstanceNotFound(e)) { + // Remove the session from the pool. + if (isClosed()) { + decrementPendingClosures(1); + } + allSessions.remove(session); + this.resourceNotFoundException = + MoreObjects.firstNonNull( + this.resourceNotFoundException, (ResourceNotFoundException) e); + } else { + releaseSession(session, Position.FIRST); } - this.resourceNotFoundException = - MoreObjects.firstNonNull( - this.resourceNotFoundException, - isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null); } else if (informFirstWaiter && readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); @@ -1809,6 +1834,9 @@ private boolean shouldUnblockReader() { private boolean shouldPrepareSession() { synchronized (lock) { + if (stopAutomaticPrepare) { + return false; + } int preparedSessions = writePreparedSessions.size() + numSessionsBeingPrepared; return preparedSessions < Math.floor(options.getWriteSessionsFraction() * totalSessions()); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index f665d66ada..8bfdd305fa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -469,12 +469,25 @@ public void testDatabaseOrInstanceDoesNotExistOnReplenish() throws Exception { @Test public void testPermissionDeniedOnPrepareSession() throws Exception { + testExceptionOnPrepareSession( + Status.PERMISSION_DENIED + .withDescription( + "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource") + .asRuntimeException()); + } + + @Test + public void testFailedPreconditionOnPrepareSession() throws Exception { + testExceptionOnPrepareSession( + Status.FAILED_PRECONDITION + .withDescription("FAILED_PRECONDITION: Database is in read-only mode") + .asRuntimeException()); + } + + private void testExceptionOnPrepareSession(StatusRuntimeException exception) + throws InterruptedException { mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofStickyException( - Status.PERMISSION_DENIED - .withDescription( - "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource") - .asRuntimeException())); + SimulatedExecutionTime.ofStickyException(exception)); DatabaseClientImpl dbClient = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); @@ -503,10 +516,39 @@ public Void run(TransactionContext transaction) throws Exception { return null; } }); - fail("missing expected PERMISSION_DENIED exception"); + fail(String.format("missing expected %s exception", exception.getStatus().getCode().name())); } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.PERMISSION_DENIED))); + assertThat(e.getErrorCode(), is(equalTo(ErrorCode.fromGrpcStatus(exception.getStatus())))); + } + // Remove the semi-permanent error condition. Getting a read/write transaction should now + // succeed, and the automatic preparing of sessions should be restarted. + mockSpanner.setBeginTransactionExecutionTime(SimulatedExecutionTime.none()); + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + for (int i = 0; i < spanner.getOptions().getSessionPoolOptions().getMinSessions(); i++) { + dbClient.pool.getReadSession().close(); } + int expectedPreparedSessions = + (int) + Math.ceil( + dbClient.pool.getNumberOfSessionsInPool() + * spanner.getOptions().getSessionPoolOptions().getWriteSessionsFraction()); + watch = watch.reset().start(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfAvailableWritePreparedSessions() < expectedPreparedSessions) { + Thread.sleep(1L); + } + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); + assertThat( + dbClient.pool.getNumberOfAvailableWritePreparedSessions(), + is(equalTo(expectedPreparedSessions))); } /**