Skip to content

Commit

Permalink
Spanner: Session pool should not retry BeginTransaction if database h…
Browse files Browse the repository at this point in the history
…as been deleted (#6789)

* session pool should not retry prepare if db is deleted

* add test for create read/write session

* rename to isDatabaseNotFound for clarity

* also stop trying on PermissionDenied
  • Loading branch information
olavloite authored and skuruppu committed Nov 15, 2019
1 parent f5879cd commit c603336
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 12 deletions.
Expand Up @@ -1153,6 +1153,13 @@ int getNumberOfSessionsBeingCreated() {
}
}

@VisibleForTesting
int getNumberOfSessionsBeingPrepared() {
synchronized (lock) {
return numSessionsBeingPrepared;
}
}

@VisibleForTesting
long getNumWaiterTimeouts() {
return numWaiterTimeouts.get();
Expand Down Expand Up @@ -1185,6 +1192,14 @@ private boolean isSessionNotFound(SpannerException e) {
return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Session not found");
}

private boolean isDatabaseNotFound(SpannerException e) {
return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Database not found");
}

private boolean isPermissionDenied(SpannerException e) {
return e.getErrorCode() == ErrorCode.PERMISSION_DENIED;
}

private void invalidateSession(PooledSession session) {
synchronized (lock) {
if (isClosed()) {
Expand Down Expand Up @@ -1440,6 +1455,21 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
synchronized (lock) {
if (isSessionNotFound(e)) {
invalidateSession(session);
} else if (isDatabaseNotFound(e) || isPermissionDenied(e)) {
// Database has been deleted or the user has no permission to write to this database. We
// should stop trying to prepare any transactions. Also propagate the error to all waiters,
// as any further waiting is pointless.
while (readWriteWaiters.size() > 0) {
readWriteWaiters.poll().put(e);
}
while (readWaiters.size() > 0) {
readWaiters.poll().put(e);
}
// Remove the session from the pool.
allSessions.remove(session);
if (isClosed()) {
decrementPendingClosures(1);
}
} else if (readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand All @@ -37,6 +39,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -48,10 +51,11 @@

@RunWith(JUnit4.class)
public class DatabaseClientImplTest {
private static final String DATABASE_DOES_NOT_EXIST_MSG =
"Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: \"type.googleapis.com/google.spanner.admin.database.v1.Database\" resource_name: \"projects/<project>/instances/<instance>/databases/<database>\" description: \"Database does not exist.\"";
private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static LocalChannelProvider channelProvider;
private static Spanner spanner;
private static final Statement UPDATE_STATEMENT =
Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2");
private static final Statement INVALID_UPDATE_STATEMENT =
Expand Down Expand Up @@ -80,6 +84,7 @@ public class DatabaseClientImplTest {
.build())
.setMetadata(SELECT1_METADATA)
.build();
private Spanner spanner;

@BeforeClass
public static void startStaticServer() throws IOException {
Expand Down Expand Up @@ -111,8 +116,6 @@ public static void stopServer() throws InterruptedException {

@Before
public void setUp() throws IOException {
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
spanner =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
Expand All @@ -125,6 +128,8 @@ public void setUp() throws IOException {
@After
public void tearDown() throws Exception {
spanner.close();
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

/**
Expand Down Expand Up @@ -257,4 +262,169 @@ public Long run(TransactionContext transaction) throws Exception {
assertThat(updateCount, is(equalTo(UPDATE_COUNT)));
}
}

@Test
public void testDatabaseDoesNotExistOnPrepareSession() throws Exception {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until all sessions have been created.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// Ensure that no sessions could be prepared and that the session pool gives up trying to
// prepare sessions.
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) {
Thread.sleep(1L);
}
assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0)));
try {
dbClient
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
fail("missing expected NOT_FOUND exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString("Database not found"));
}
}

@Test
public void testDatabaseDoesNotExistOnInitialization() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until session creation has finished.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// All session creation should fail and stop trying.
assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0)));
}

@Test
public void testDatabaseDoesNotExistOnCreate() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException()));
// Ensure there are no sessions in the pool by default.
try (Spanner spanner =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build())
.build()
.getService()) {
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// The create session failure should propagate to the client and not retry.
try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG));
}
try {
dbClient.readWriteTransaction();
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString(DATABASE_DOES_NOT_EXIST_MSG));
}
}
}

@Test
public void testDatabaseDoesNotExistOnReplenish() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription(DATABASE_DOES_NOT_EXIST_MSG).asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until session creation has finished.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// All session creation should fail and stop trying.
assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0)));
// Force a maintainer run. This should schedule new session creation.
dbClient.pool.poolMaintainer.maintainPool();
// Wait until the replenish has finished.
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// All session creation from replenishPool should fail and stop trying.
assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0)));
}

@Test
public void testPermissionDeniedOnPrepareSession() throws Exception {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.PERMISSION_DENIED
.withDescription(
"Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource")
.asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until all sessions have been created.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// Ensure that no sessions could be prepared and that the session pool gives up trying to
// prepare sessions.
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) {
Thread.sleep(1L);
}
assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0)));
try {
dbClient
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});
fail("missing expected PERMISSION_DENIED exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.PERMISSION_DENIED)));
}
}
}
Expand Up @@ -362,6 +362,7 @@ public static class SimulatedExecutionTime {
private final int minimumExecutionTime;
private final int randomExecutionTime;
private final Queue<Exception> exceptions;
private final boolean stickyException;

/**
* Creates a simulated execution time that will always be somewhere between <code>
Expand All @@ -384,36 +385,43 @@ public static SimulatedExecutionTime none() {
}

public static SimulatedExecutionTime ofException(Exception exception) {
return new SimulatedExecutionTime(0, 0, Arrays.asList(exception));
return new SimulatedExecutionTime(0, 0, Arrays.asList(exception), false);
}

public static SimulatedExecutionTime ofStickyException(Exception exception) {
return new SimulatedExecutionTime(0, 0, Arrays.asList(exception), true);
}

public static SimulatedExecutionTime ofExceptions(Collection<Exception> exceptions) {
return new SimulatedExecutionTime(0, 0, exceptions);
return new SimulatedExecutionTime(0, 0, exceptions, false);
}

public static SimulatedExecutionTime ofMinimumAndRandomTimeAndExceptions(
int minimumExecutionTime, int randomExecutionTime, Collection<Exception> exceptions) {
return new SimulatedExecutionTime(minimumExecutionTime, randomExecutionTime, exceptions);
return new SimulatedExecutionTime(
minimumExecutionTime, randomExecutionTime, exceptions, false);
}

private SimulatedExecutionTime(int minimum, int random) {
this(minimum, random, Collections.<Exception>emptyList());
this(minimum, random, Collections.<Exception>emptyList(), false);
}

private SimulatedExecutionTime(int minimum, int random, Collection<Exception> exceptions) {
private SimulatedExecutionTime(
int minimum, int random, Collection<Exception> exceptions, boolean stickyException) {
Preconditions.checkArgument(minimum >= 0, "Minimum execution time must be >= 0");
Preconditions.checkArgument(random >= 0, "Random execution time must be >= 0");
this.minimumExecutionTime = minimum;
this.randomExecutionTime = random;
this.exceptions = new LinkedList<>(exceptions);
this.stickyException = stickyException;
}

private void simulateExecutionTime(
Queue<Exception> globalExceptions, ReadWriteLock freezeLock) {
try {
freezeLock.readLock().lock();
checkException(globalExceptions);
checkException(this.exceptions);
checkException(globalExceptions, false);
checkException(this.exceptions, stickyException);
if (minimumExecutionTime > 0 || randomExecutionTime > 0) {
Uninterruptibles.sleepUninterruptibly(
(randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime))
Expand All @@ -425,8 +433,8 @@ private void simulateExecutionTime(
}
}

private static void checkException(Queue<Exception> exceptions) {
Exception e = exceptions.poll();
private static void checkException(Queue<Exception> exceptions, boolean keepException) {
Exception e = keepException ? exceptions.peek() : exceptions.poll();
if (e != null) {
Throwables.throwIfUnchecked(e);
throw Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException();
Expand Down Expand Up @@ -1609,6 +1617,7 @@ public void reset() {
}

public void removeAllExecutionTimes() {
batchCreateSessionsExecutionTime = NO_EXECUTION_TIME;
beginTransactionExecutionTime = NO_EXECUTION_TIME;
commitExecutionTime = NO_EXECUTION_TIME;
createSessionExecutionTime = NO_EXECUTION_TIME;
Expand Down

0 comments on commit c603336

Please sign in to comment.