diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 532778b7d3e7..c241db358986 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1153,6 +1153,13 @@ int getNumberOfSessionsBeingCreated() { } } + @VisibleForTesting + int getNumberOfSessionsBeingPrepared() { + synchronized (lock) { + return numSessionsBeingPrepared; + } + } + @VisibleForTesting long getNumWaiterTimeouts() { return numWaiterTimeouts.get(); @@ -1185,6 +1192,14 @@ private boolean isSessionNotFound(SpannerException e) { return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Session not found"); } + private boolean isDatabaseNotFound(SpannerException e) { + return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found"); + } + + private boolean isPermissionDenied(SpannerException e) { + return e.getErrorCode() == ErrorCode.PERMISSION_DENIED; + } + private void invalidateSession(PooledSession session) { synchronized (lock) { if (isClosed()) { @@ -1440,6 +1455,21 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi synchronized (lock) { if (isSessionNotFound(e)) { invalidateSession(session); + } else if (isDatabaseNotFound(e) || isPermissionDenied(e)) { + // Database has been deleted or the user has no permission to write to this database. We + // should stop trying to prepare any transactions. Also propagate the error to all waiters, + // as any further waiting is pointless. + while (readWriteWaiters.size() > 0) { + readWriteWaiters.poll().put(e); + } + while (readWaiters.size() > 0) { + readWaiters.poll().put(e); + } + // Remove the session from the pool. + allSessions.remove(session); + if (isClosed()) { + decrementPendingClosures(1); + } } else if (readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index c0934aeeabb5..4c6af78f4c40 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -27,6 +28,7 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.common.base.Stopwatch; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -37,6 +39,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import java.io.IOException; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -48,10 +51,11 @@ @RunWith(JUnit4.class) public class DatabaseClientImplTest { + private static final String DATABASE_DOES_NOT_EXIST_MSG = + "Database not found: projects//instances//databases/ resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\" resource_name: \"projects//instances//databases/\" description: \"Database does not exist.\""; private static MockSpannerServiceImpl mockSpanner; private static Server server; private static LocalChannelProvider channelProvider; - private static Spanner spanner; private static final Statement UPDATE_STATEMENT = Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); private static final Statement INVALID_UPDATE_STATEMENT = @@ -80,6 +84,7 @@ public class DatabaseClientImplTest { .build()) .setMetadata(SELECT1_METADATA) .build(); + private Spanner spanner; @BeforeClass public static void startStaticServer() throws IOException { @@ -111,8 +116,6 @@ public static void stopServer() throws InterruptedException { @Before public void setUp() throws IOException { - mockSpanner.reset(); - mockSpanner.removeAllExecutionTimes(); spanner = SpannerOptions.newBuilder() .setProjectId("[PROJECT]") @@ -125,6 +128,8 @@ public void setUp() throws IOException { @After public void tearDown() throws Exception { spanner.close(); + mockSpanner.reset(); + mockSpanner.removeAllExecutionTimes(); } /** @@ -257,4 +262,169 @@ public Long run(TransactionContext transaction) throws Exception { assertThat(updateCount, is(equalTo(UPDATE_COUNT))); } } + + @Test + public void testDatabaseDoesNotExistOnPrepareSession() throws Exception { + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofStickyException( + Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // Wait until all sessions have been created. + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingCreated() > 0) { + Thread.sleep(1L); + } + // Ensure that no sessions could be prepared and that the session pool gives up trying to + // prepare sessions. + watch = watch.reset().start(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) { + Thread.sleep(1L); + } + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0))); + try { + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + fail("missing expected NOT_FOUND exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); + assertThat(e.getMessage(), containsString("Database not found")); + } + } + + @Test + public void testDatabaseDoesNotExistOnInitialization() throws Exception { + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofStickyException( + Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // Wait until session creation has finished. + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingCreated() > 0) { + Thread.sleep(1L); + } + // All session creation should fail and stop trying. + assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0))); + } + + @Test + public void testDatabaseDoesNotExistOnCreate() throws Exception { + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofStickyException( + Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + // Ensure there are no sessions in the pool by default. + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) + .build() + .getService()) { + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // The create session failure should propagate to the client and not retry. + try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) { + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); + assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG)); + } + try { + dbClient.readWriteTransaction(); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND))); + assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG)); + } + } + } + + @Test + public void testDatabaseDoesNotExistOnReplenish() throws Exception { + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofStickyException( + Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException())); + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // Wait until session creation has finished. + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingCreated() > 0) { + Thread.sleep(1L); + } + // All session creation should fail and stop trying. + assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0))); + // Force a maintainer run. This should schedule new session creation. + dbClient.pool.poolMaintainer.maintainPool(); + // Wait until the replenish has finished. + watch = watch.reset().start(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingCreated() > 0) { + Thread.sleep(1L); + } + // All session creation from replenishPool should fail and stop trying. + assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0))); + } + + @Test + public void testPermissionDeniedOnPrepareSession() throws Exception { + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofStickyException( + Status.PERMISSION_DENIED + .withDescription( + "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource") + .asRuntimeException())); + DatabaseClientImpl dbClient = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // Wait until all sessions have been created. + Stopwatch watch = Stopwatch.createStarted(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingCreated() > 0) { + Thread.sleep(1L); + } + // Ensure that no sessions could be prepared and that the session pool gives up trying to + // prepare sessions. + watch = watch.reset().start(); + while (watch.elapsed(TimeUnit.SECONDS) < 5 + && dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) { + Thread.sleep(1L); + } + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0))); + try { + dbClient + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + fail("missing expected PERMISSION_DENIED exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode(), is(equalTo(ErrorCode.PERMISSION_DENIED))); + } + } } diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 97c1e65dffb1..0a451e5baa81 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -362,6 +362,7 @@ public static class SimulatedExecutionTime { private final int minimumExecutionTime; private final int randomExecutionTime; private final Queue exceptions; + private final boolean stickyException; /** * Creates a simulated execution time that will always be somewhere between @@ -384,36 +385,43 @@ public static SimulatedExecutionTime none() { } public static SimulatedExecutionTime ofException(Exception exception) { - return new SimulatedExecutionTime(0, 0, Arrays.asList(exception)); + return new SimulatedExecutionTime(0, 0, Arrays.asList(exception), false); + } + + public static SimulatedExecutionTime ofStickyException(Exception exception) { + return new SimulatedExecutionTime(0, 0, Arrays.asList(exception), true); } public static SimulatedExecutionTime ofExceptions(Collection exceptions) { - return new SimulatedExecutionTime(0, 0, exceptions); + return new SimulatedExecutionTime(0, 0, exceptions, false); } public static SimulatedExecutionTime ofMinimumAndRandomTimeAndExceptions( int minimumExecutionTime, int randomExecutionTime, Collection exceptions) { - return new SimulatedExecutionTime(minimumExecutionTime, randomExecutionTime, exceptions); + return new SimulatedExecutionTime( + minimumExecutionTime, randomExecutionTime, exceptions, false); } private SimulatedExecutionTime(int minimum, int random) { - this(minimum, random, Collections.emptyList()); + this(minimum, random, Collections.emptyList(), false); } - private SimulatedExecutionTime(int minimum, int random, Collection exceptions) { + private SimulatedExecutionTime( + int minimum, int random, Collection exceptions, boolean stickyException) { Preconditions.checkArgument(minimum >= 0, "Minimum execution time must be >= 0"); Preconditions.checkArgument(random >= 0, "Random execution time must be >= 0"); this.minimumExecutionTime = minimum; this.randomExecutionTime = random; this.exceptions = new LinkedList<>(exceptions); + this.stickyException = stickyException; } private void simulateExecutionTime( Queue globalExceptions, ReadWriteLock freezeLock) { try { freezeLock.readLock().lock(); - checkException(globalExceptions); - checkException(this.exceptions); + checkException(globalExceptions, false); + checkException(this.exceptions, stickyException); if (minimumExecutionTime > 0 || randomExecutionTime > 0) { Uninterruptibles.sleepUninterruptibly( (randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime)) @@ -425,8 +433,8 @@ private void simulateExecutionTime( } } - private static void checkException(Queue exceptions) { - Exception e = exceptions.poll(); + private static void checkException(Queue exceptions, boolean keepException) { + Exception e = keepException ? exceptions.peek() : exceptions.poll(); if (e != null) { Throwables.throwIfUnchecked(e); throw Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException(); @@ -1609,6 +1617,7 @@ public void reset() { } public void removeAllExecutionTimes() { + batchCreateSessionsExecutionTime = NO_EXECUTION_TIME; beginTransactionExecutionTime = NO_EXECUTION_TIME; commitExecutionTime = NO_EXECUTION_TIME; createSessionExecutionTime = NO_EXECUTION_TIME;