diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 8d731f191c..3b2a0d2da3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1099,6 +1099,7 @@ private static enum Position { private final ScheduledExecutorService executor; private final ExecutorFactory executorFactory; private final ScheduledExecutorService prepareExecutor; + private final int prepareThreadPoolSize; final PoolMaintainer poolMaintainer; private final Clock clock; private final Object lock = new Object(); @@ -1143,6 +1144,12 @@ private static enum Position { @GuardedBy("lock") private long numSessionsReleased = 0; + @GuardedBy("lock") + private long numSessionsInProcessPrepared = 0; + + @GuardedBy("lock") + private long numSessionsAsyncPrepared = 0; + @GuardedBy("lock") private long numIdleSessionsRemoved = 0; @@ -1224,15 +1231,14 @@ private SessionPool( this.options = options; this.executorFactory = executorFactory; this.executor = executor; - int prepareThreads; if (executor instanceof ThreadPoolExecutor) { - prepareThreads = Math.max(((ThreadPoolExecutor) executor).getCorePoolSize(), 1); + prepareThreadPoolSize = Math.max(((ThreadPoolExecutor) executor).getCorePoolSize(), 1); } else { - prepareThreads = 8; + prepareThreadPoolSize = 8; } this.prepareExecutor = Executors.newScheduledThreadPool( - prepareThreads, + prepareThreadPoolSize, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("session-pool-prepare-%d") @@ -1244,6 +1250,19 @@ private SessionPool( } @VisibleForTesting + long getNumberOfSessionsInProcessPrepared() { + synchronized (lock) { + return numSessionsInProcessPrepared; + } + } + + @VisibleForTesting + long getNumberOfSessionsAsyncPrepared() { + synchronized (lock) { + return numSessionsAsyncPrepared; + } + } + void removeFromPool(PooledSession session) { synchronized (lock) { if (isClosed()) { @@ -1453,46 +1472,98 @@ PooledSession getReadSession() throws SpannerException { PooledSession getReadWriteSession() { Span span = Tracing.getTracer().getCurrentSpan(); span.addAnnotation("Acquiring read write session"); - Waiter waiter = null; PooledSession sess = null; - synchronized (lock) { - if (closureFuture != null) { - span.addAnnotation("Pool has been closed"); - throw new IllegalStateException("Pool has been closed"); + // Loop to retry SessionNotFoundExceptions that might occur during in-process prepare of a + // session. + while (true) { + Waiter waiter = null; + boolean inProcessPrepare = false; + synchronized (lock) { + if (closureFuture != null) { + span.addAnnotation("Pool has been closed"); + throw new IllegalStateException("Pool has been closed"); + } + if (resourceNotFoundException != null) { + span.addAnnotation("Database has been deleted"); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.NOT_FOUND, + String.format( + "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", + resourceNotFoundException.getMessage()), + resourceNotFoundException); + } + sess = writePreparedSessions.poll(); + if (sess == null) { + if (numSessionsBeingPrepared <= prepareThreadPoolSize) { + if (numSessionsBeingPrepared <= readWriteWaiters.size()) { + PooledSession readSession = readSessions.poll(); + if (readSession != null) { + span.addAnnotation( + "Acquired read only session. Preparing for read write transaction"); + prepareSession(readSession); + } else { + span.addAnnotation("No session available"); + maybeCreateSession(); + } + } + } else { + inProcessPrepare = true; + numSessionsInProcessPrepared++; + PooledSession readSession = readSessions.poll(); + if (readSession != null) { + // Create a read/write transaction in-process if there is already a queue for prepared + // sessions. This is more efficient than doing it asynchronously, as it scales with + // the number of user threads. The thread pool for asynchronously preparing sessions + // is fixed. + span.addAnnotation( + "Acquired read only session. Preparing in-process for read write transaction"); + sess = readSession; + } else { + span.addAnnotation("No session available"); + maybeCreateSession(); + } + } + if (sess == null) { + waiter = new Waiter(); + if (inProcessPrepare) { + // inProcessPrepare=true means that we have already determined that the queue for + // preparing read/write sessions is larger than the number of threads in the prepare + // thread pool, and that it's more efficient to do the prepare in-process. We will + // therefore create a waiter for a read-only session, even though a read/write session + // has been requested. + readWaiters.add(waiter); + } else { + readWriteWaiters.add(waiter); + } + } + } else { + span.addAnnotation("Acquired read write session"); + } } - if (resourceNotFoundException != null) { - span.addAnnotation("Database has been deleted"); - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.NOT_FOUND, - String.format( - "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", - resourceNotFoundException.getMessage()), - resourceNotFoundException); + if (waiter != null) { + logger.log( + Level.FINE, + "No session available in the pool. Blocking for one to become available/created"); + span.addAnnotation("Waiting for read write session to be available"); + sess = waiter.take(); } - sess = writePreparedSessions.poll(); - if (sess == null) { - if (numSessionsBeingPrepared <= readWriteWaiters.size()) { - PooledSession readSession = readSessions.poll(); - if (readSession != null) { - span.addAnnotation("Acquired read only session. Preparing for read write transaction"); - prepareSession(readSession); - } else { - span.addAnnotation("No session available"); - maybeCreateSession(); + if (inProcessPrepare) { + try { + sess.prepareReadWriteTransaction(); + } catch (Throwable t) { + sess = null; + SpannerException e = newSpannerException(t); + if (!isClosed()) { + handlePrepareSessionFailure(e, sess, false); + } + if (!isSessionNotFound(e)) { + throw e; } } - waiter = new Waiter(); - readWriteWaiters.add(waiter); - } else { - span.addAnnotation("Acquired read write session"); } - } - if (waiter != null) { - logger.log( - Level.FINE, - "No session available in the pool. Blocking for one to become available/created"); - span.addAnnotation("Waiting for read write session to be available"); - sess = waiter.take(); + if (sess != null) { + break; + } } sess.markBusy(); incrementNumSessionsInUse(); @@ -1620,7 +1691,8 @@ private void handleCreateSessionsFailure(SpannerException e, int count) { } } - private void handlePrepareSessionFailure(SpannerException e, PooledSession session) { + private void handlePrepareSessionFailure( + SpannerException e, PooledSession session, boolean informFirstWaiter) { synchronized (lock) { if (isSessionNotFound(e)) { invalidateSession(session); @@ -1643,7 +1715,7 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi MoreObjects.firstNonNull( this.resourceNotFoundException, isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null); - } else if (readWriteWaiters.size() > 0) { + } else if (informFirstWaiter && readWriteWaiters.size() > 0) { releaseSession(session, Position.FIRST); readWriteWaiters.poll().put(e); } else { @@ -1792,6 +1864,7 @@ public void run() { sess.prepareReadWriteTransaction(); logger.log(Level.FINE, "Session prepared"); synchronized (lock) { + numSessionsAsyncPrepared++; numSessionsBeingPrepared--; if (!isClosed()) { if (readWriteWaiters.size() > 0) { @@ -1807,7 +1880,7 @@ public void run() { synchronized (lock) { numSessionsBeingPrepared--; if (!isClosed()) { - handlePrepareSessionFailure(newSpannerException(t), sess); + handlePrepareSessionFailure(newSpannerException(t), sess, true); } } }