Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: prepare sessions with r/w tx in-process #152

Merged
merged 3 commits into from Apr 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1099,6 +1099,7 @@ private static enum Position {
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService prepareExecutor;
private final int prepareThreadPoolSize;
final PoolMaintainer poolMaintainer;
private final Clock clock;
private final Object lock = new Object();
Expand Down Expand Up @@ -1143,6 +1144,12 @@ private static enum Position {
@GuardedBy("lock")
private long numSessionsReleased = 0;

@GuardedBy("lock")
private long numSessionsInProcessPrepared = 0;

@GuardedBy("lock")
private long numSessionsAsyncPrepared = 0;

@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

Expand Down Expand Up @@ -1224,15 +1231,14 @@ private SessionPool(
this.options = options;
this.executorFactory = executorFactory;
this.executor = executor;
int prepareThreads;
if (executor instanceof ThreadPoolExecutor) {
prepareThreads = Math.max(((ThreadPoolExecutor) executor).getCorePoolSize(), 1);
prepareThreadPoolSize = Math.max(((ThreadPoolExecutor) executor).getCorePoolSize(), 1);
} else {
prepareThreads = 8;
prepareThreadPoolSize = 8;
}
this.prepareExecutor =
Executors.newScheduledThreadPool(
prepareThreads,
prepareThreadPoolSize,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("session-pool-prepare-%d")
Expand All @@ -1244,6 +1250,19 @@ private SessionPool(
}

@VisibleForTesting
long getNumberOfSessionsInProcessPrepared() {
synchronized (lock) {
return numSessionsInProcessPrepared;
}
}

@VisibleForTesting
long getNumberOfSessionsAsyncPrepared() {
synchronized (lock) {
return numSessionsAsyncPrepared;
}
}

void removeFromPool(PooledSession session) {
synchronized (lock) {
if (isClosed()) {
Expand Down Expand Up @@ -1453,46 +1472,98 @@ PooledSession getReadSession() throws SpannerException {
PooledSession getReadWriteSession() {
Span span = Tracing.getTracer().getCurrentSpan();
span.addAnnotation("Acquiring read write session");
Waiter waiter = null;
PooledSession sess = null;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
// Loop to retry SessionNotFoundExceptions that might occur during in-process prepare of a
// session.
while (true) {
Waiter waiter = null;
boolean inProcessPrepare = false;
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
}
if (resourceNotFoundException != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
resourceNotFoundException.getMessage()),
resourceNotFoundException);
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= prepareThreadPoolSize) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
PooledSession readSession = readSessions.poll();
if (readSession != null) {
span.addAnnotation(
"Acquired read only session. Preparing for read write transaction");
prepareSession(readSession);
} else {
span.addAnnotation("No session available");
maybeCreateSession();
}
}
} else {
inProcessPrepare = true;
numSessionsInProcessPrepared++;
PooledSession readSession = readSessions.poll();
if (readSession != null) {
// Create a read/write transaction in-process if there is already a queue for prepared
// sessions. This is more efficient than doing it asynchronously, as it scales with
// the number of user threads. The thread pool for asynchronously preparing sessions
// is fixed.
span.addAnnotation(
"Acquired read only session. Preparing in-process for read write transaction");
sess = readSession;
} else {
span.addAnnotation("No session available");
maybeCreateSession();
}
}
if (sess == null) {
waiter = new Waiter();
if (inProcessPrepare) {
// inProcessPrepare=true means that we have already determined that the queue for
// preparing read/write sessions is larger than the number of threads in the prepare
// thread pool, and that it's more efficient to do the prepare in-process. We will
// therefore create a waiter for a read-only session, even though a read/write session
// has been requested.
readWaiters.add(waiter);
} else {
readWriteWaiters.add(waiter);
}
Copy link
Contributor

@skuruppu skuruppu Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this. Why is the waiter being added to readWaiters if inProcessPrepare? Aren't we trying to create a read/write session here?

I suppose I don't understand why it makes a difference if this is done in-process.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment on why we add it to readWaiters here.

The reasons are:

  1. The normal prepare process is executed using a background executor with a fixed thread pool. When the work queue for this executor has exceeded the number of threads available in the thread pool, any requester for a read/write session will have to wait for a thread to come available, and then for the prepare to actually be executed. In the meantime, the user thread is just waiting. In those cases, it's more efficient to use the user thread to execute the BeginTransaction RPC, instead of letting it wait for a thread to come available in the thread pool.
  2. In this specific case, in addition to the work queue for preparing sessions being longer than the number of threads in the thread pool, there is also no session at all available in the session pool. In that case, it's more efficient to wait for any (read) session to come available, and then execute the BeginTransaction RPC using the user thread, than to wait for both a session to come available, and then for a thread in the thread pool to come available for preparing the session.

}
} else {
span.addAnnotation("Acquired read write session");
}
}
if (resourceNotFoundException != null) {
span.addAnnotation("Database has been deleted");
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND,
String.format(
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
resourceNotFoundException.getMessage()),
resourceNotFoundException);
if (waiter != null) {
logger.log(
Level.FINE,
"No session available in the pool. Blocking for one to become available/created");
span.addAnnotation("Waiting for read write session to be available");
sess = waiter.take();
}
sess = writePreparedSessions.poll();
if (sess == null) {
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
PooledSession readSession = readSessions.poll();
if (readSession != null) {
span.addAnnotation("Acquired read only session. Preparing for read write transaction");
prepareSession(readSession);
} else {
span.addAnnotation("No session available");
maybeCreateSession();
if (inProcessPrepare) {
try {
sess.prepareReadWriteTransaction();
} catch (Throwable t) {
sess = null;
SpannerException e = newSpannerException(t);
if (!isClosed()) {
handlePrepareSessionFailure(e, sess, false);
}
if (!isSessionNotFound(e)) {
throw e;
}
}
waiter = new Waiter();
readWriteWaiters.add(waiter);
} else {
span.addAnnotation("Acquired read write session");
}
}
if (waiter != null) {
logger.log(
Level.FINE,
"No session available in the pool. Blocking for one to become available/created");
span.addAnnotation("Waiting for read write session to be available");
sess = waiter.take();
if (sess != null) {
break;
}
}
sess.markBusy();
incrementNumSessionsInUse();
Expand Down Expand Up @@ -1620,7 +1691,8 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
}
}

private void handlePrepareSessionFailure(SpannerException e, PooledSession session) {
private void handlePrepareSessionFailure(
SpannerException e, PooledSession session, boolean informFirstWaiter) {
synchronized (lock) {
if (isSessionNotFound(e)) {
invalidateSession(session);
Expand All @@ -1643,7 +1715,7 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
MoreObjects.firstNonNull(
this.resourceNotFoundException,
isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null);
} else if (readWriteWaiters.size() > 0) {
} else if (informFirstWaiter && readWriteWaiters.size() > 0) {
releaseSession(session, Position.FIRST);
readWriteWaiters.poll().put(e);
} else {
Expand Down Expand Up @@ -1792,6 +1864,7 @@ public void run() {
sess.prepareReadWriteTransaction();
logger.log(Level.FINE, "Session prepared");
synchronized (lock) {
numSessionsAsyncPrepared++;
numSessionsBeingPrepared--;
if (!isClosed()) {
if (readWriteWaiters.size() > 0) {
Expand All @@ -1807,7 +1880,7 @@ public void run() {
synchronized (lock) {
numSessionsBeingPrepared--;
if (!isClosed()) {
handlePrepareSessionFailure(newSpannerException(t), sess);
handlePrepareSessionFailure(newSpannerException(t), sess, true);
}
}
}
Expand Down