Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: Session pool should not retry BeginTransaction if database has been deleted #6789

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1153,6 +1153,13 @@ int getNumberOfSessionsBeingCreated() {
}
}

@VisibleForTesting
int getNumberOfSessionsBeingPrepared() {
synchronized (lock) {
return numSessionsBeingPrepared;
}
}

@VisibleForTesting
long getNumWaiterTimeouts() {
return numWaiterTimeouts.get();
Expand Down Expand Up @@ -1185,6 +1192,11 @@ private boolean isSessionNotFound(SpannerException e) {
return e.getErrorCode() == ErrorCode.NOT_FOUND && e.getMessage().contains("Session not found");
}

private boolean isDatabaseDoesNotExist(SpannerException e) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
return e.getErrorCode() == ErrorCode.NOT_FOUND
&& e.getMessage().contains("Database does not exist");
}

private void invalidateSession(PooledSession session) {
synchronized (lock) {
if (isClosed()) {
Expand Down Expand Up @@ -1440,6 +1452,20 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
synchronized (lock) {
if (isSessionNotFound(e)) {
invalidateSession(session);
} else if (isDatabaseDoesNotExist(e)) {
// Database has been deleted. We should stop trying to prepare any transactions. Also
// propagate the error to all waiters, as any further waiting is pointless.
while (readWriteWaiters.size() > 0) {
readWriteWaiters.poll().put(e);
}
while (readWaiters.size() > 0) {
readWaiters.poll().put(e);
}
// Remove the session from the pool.
allSessions.remove(session);
if (isClosed()) {
decrementPendingClosures(1);
}
} else if (readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand All @@ -37,6 +39,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -51,7 +54,6 @@ public class DatabaseClientImplTest {
private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static LocalChannelProvider channelProvider;
private static Spanner spanner;
private static final Statement UPDATE_STATEMENT =
Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2");
private static final Statement INVALID_UPDATE_STATEMENT =
Expand Down Expand Up @@ -80,6 +82,7 @@ public class DatabaseClientImplTest {
.build())
.setMetadata(SELECT1_METADATA)
.build();
private Spanner spanner;

@BeforeClass
public static void startStaticServer() throws IOException {
Expand Down Expand Up @@ -111,8 +114,6 @@ public static void stopServer() throws InterruptedException {

@Before
public void setUp() throws IOException {
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
spanner =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
Expand All @@ -125,6 +126,8 @@ public void setUp() throws IOException {
@After
public void tearDown() throws Exception {
spanner.close();
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

/**
Expand Down Expand Up @@ -257,4 +260,112 @@ public Long run(TransactionContext transaction) throws Exception {
assertThat(updateCount, is(equalTo(UPDATE_COUNT)));
}
}

@Test
public void testDatabaseDoesNotExistOnPrepareSession() throws Exception {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription("Database does not exist").asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until all sessions have been created.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// Ensure that no sessions could be prepared and that the session pool gives up trying to
// prepare sessions.
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) {
Thread.sleep(1L);
}
assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0)));
}

@Test
public void testDatabaseDoesNotExistOnInitialization() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription("Database does not exist").asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until session creation has finished.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// All session creation should fail and stop trying.
assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0)));
}

@Test
public void testDatabaseDoesNotExistOnCreate() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription("Database does not exist").asRuntimeException()));
// Ensure there are no sessions in the pool by default.
try (Spanner spanner =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build())
.build()
.getService()) {
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// The create session failure should propagate to the client and not retry.
try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString("Database does not exist"));
}
try {
dbClient.readWriteTransaction();
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode(), is(equalTo(ErrorCode.NOT_FOUND)));
assertThat(e.getMessage(), containsString("Database does not exist"));
}
}
}

@Test
public void testDatabaseDoesNotExistOnReplenish() throws Exception {
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(
Status.NOT_FOUND.withDescription("Database does not exist").asRuntimeException()));
DatabaseClientImpl dbClient =
(DatabaseClientImpl)
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// Wait until session creation has finished.
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// All session creation should fail and stop trying.
assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0)));
// Force a maintainer run. This should schedule new session creation.
dbClient.pool.poolMaintainer.maintainPool();
// Wait until the replenish has finished.
watch = watch.reset().start();
while (watch.elapsed(TimeUnit.SECONDS) < 5
&& dbClient.pool.getNumberOfSessionsBeingCreated() > 0) {
Thread.sleep(1L);
}
// All session creation from replenishPool should fail and stop trying.
assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0)));
assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0)));
}
}
Expand Up @@ -362,6 +362,7 @@ public static class SimulatedExecutionTime {
private final int minimumExecutionTime;
private final int randomExecutionTime;
private final Queue<Exception> exceptions;
private final boolean stickyException;

/**
* Creates a simulated execution time that will always be somewhere between <code>
Expand All @@ -384,36 +385,43 @@ public static SimulatedExecutionTime none() {
}

public static SimulatedExecutionTime ofException(Exception exception) {
return new SimulatedExecutionTime(0, 0, Arrays.asList(exception));
return new SimulatedExecutionTime(0, 0, Arrays.asList(exception), false);
}

public static SimulatedExecutionTime ofStickyException(Exception exception) {
return new SimulatedExecutionTime(0, 0, Arrays.asList(exception), true);
}

public static SimulatedExecutionTime ofExceptions(Collection<Exception> exceptions) {
return new SimulatedExecutionTime(0, 0, exceptions);
return new SimulatedExecutionTime(0, 0, exceptions, false);
}

public static SimulatedExecutionTime ofMinimumAndRandomTimeAndExceptions(
int minimumExecutionTime, int randomExecutionTime, Collection<Exception> exceptions) {
return new SimulatedExecutionTime(minimumExecutionTime, randomExecutionTime, exceptions);
return new SimulatedExecutionTime(
minimumExecutionTime, randomExecutionTime, exceptions, false);
}

private SimulatedExecutionTime(int minimum, int random) {
this(minimum, random, Collections.<Exception>emptyList());
this(minimum, random, Collections.<Exception>emptyList(), false);
}

private SimulatedExecutionTime(int minimum, int random, Collection<Exception> exceptions) {
private SimulatedExecutionTime(
int minimum, int random, Collection<Exception> exceptions, boolean stickyException) {
Preconditions.checkArgument(minimum >= 0, "Minimum execution time must be >= 0");
Preconditions.checkArgument(random >= 0, "Random execution time must be >= 0");
this.minimumExecutionTime = minimum;
this.randomExecutionTime = random;
this.exceptions = new LinkedList<>(exceptions);
this.stickyException = stickyException;
}

private void simulateExecutionTime(
Queue<Exception> globalExceptions, ReadWriteLock freezeLock) {
try {
freezeLock.readLock().lock();
checkException(globalExceptions);
checkException(this.exceptions);
checkException(globalExceptions, false);
checkException(this.exceptions, stickyException);
if (minimumExecutionTime > 0 || randomExecutionTime > 0) {
Uninterruptibles.sleepUninterruptibly(
(randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime))
Expand All @@ -425,8 +433,8 @@ private void simulateExecutionTime(
}
}

private static void checkException(Queue<Exception> exceptions) {
Exception e = exceptions.poll();
private static void checkException(Queue<Exception> exceptions, boolean keepException) {
Exception e = keepException ? exceptions.peek() : exceptions.poll();
if (e != null) {
Throwables.throwIfUnchecked(e);
throw Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException();
Expand Down Expand Up @@ -1609,6 +1617,7 @@ public void reset() {
}

public void removeAllExecutionTimes() {
batchCreateSessionsExecutionTime = NO_EXECUTION_TIME;
beginTransactionExecutionTime = NO_EXECUTION_TIME;
commitExecutionTime = NO_EXECUTION_TIME;
createSessionExecutionTime = NO_EXECUTION_TIME;
Expand Down