Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: remove session pool preparing #515

Merged
merged 4 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -39,16 +39,14 @@ final class AsyncTransactionManagerImpl

private final SessionImpl session;
private Span span;
private final boolean inlineBegin;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span, boolean inlineBegin) {
AsyncTransactionManagerImpl(SessionImpl session, Span span) {
this.session = session;
this.span = span;
this.inlineBegin = inlineBegin;
}

@Override
Expand All @@ -69,15 +67,15 @@ public TransactionContextFutureImpl beginAsync() {
return begin;
}

private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
if (setActive) {
if (firstAttempt) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut;
if (inlineBegin) {
if (firstAttempt) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
Expand Down
Expand Up @@ -29,48 +29,33 @@

class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN =
"CloudSpanner.ReadWriteTransactionWithInlineBegin";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();

private enum SessionMode {
READ,
READ_WRITE
}

@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
private final boolean inlineBeginReadWriteTransactions;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, boolean inlineBeginReadWriteTransactions) {
this("", pool, inlineBeginReadWriteTransactions);
DatabaseClientImpl(SessionPool pool) {
this("", pool);
}

DatabaseClientImpl(String clientId, SessionPool pool, boolean inlineBeginReadWriteTransactions) {
DatabaseClientImpl(String clientId, SessionPool pool) {
this.clientId = clientId;
this.pool = pool;
this.inlineBeginReadWriteTransactions = inlineBeginReadWriteTransactions;
}

@VisibleForTesting
PooledSessionFuture getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSessionFuture getReadWriteSession() {
return pool.getReadWriteSession();
return pool.get();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -90,7 +75,6 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand Down Expand Up @@ -173,55 +157,21 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction() {
return inlineBeginReadWriteTransactions
? inlinedReadWriteTransaction()
: preparedReadWriteTransaction();
}

private TransactionRunner preparedReadWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}

private TransactionRunner inlinedReadWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
// An inlined read/write transaction does not need a write-prepared session.
return getReadSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}

@Override
public TransactionManager transactionManager() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManager()
: preparedTransactionManager();
}

private TransactionManager preparedTransactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private TransactionManager inlinedTransactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
// An inlined read/write transaction does not need a write-prepared session.
return getReadSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
Expand All @@ -231,21 +181,7 @@ private TransactionManager inlinedTransactionManager() {

@Override
public AsyncRunner runAsync() {
return inlineBeginReadWriteTransactions ? inlinedRunAsync() : preparedRunAsync();
}

private AsyncRunner preparedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private AsyncRunner inlinedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().runAsync();
} catch (RuntimeException e) {
Expand All @@ -256,23 +192,7 @@ private AsyncRunner inlinedRunAsync() {

@Override
public AsyncTransactionManager transactionManagerAsync() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManagerAsync()
: preparedTransactionManagerAsync();
}

private AsyncTransactionManager preparedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private AsyncTransactionManager inlinedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().transactionManagerAsync();
} catch (RuntimeException e) {
Expand All @@ -285,10 +205,7 @@ private AsyncTransactionManager inlinedTransactionManagerAsync() {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
// A partitioned update transaction does not need a prepared write session, as the transaction
// object will start a new transaction with specific options anyway.
return runWithSessionRetry(
SessionMode.READ,
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
Expand All @@ -301,17 +218,13 @@ public Long apply(Session session) {
}
}

private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callable) {
PooledSessionFuture session =
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSessionFuture session = getReadSession();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we rename methods such as getReadSession() to getSession() now that we do not have such separation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

while (true) {
try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session =
mode == SessionMode.READ_WRITE
? pool.replaceReadWriteSession(e, session)
: pool.replaceReadSession(e, session);
session = pool.replaceSession(e, session);
}
}
}
Expand Down
Expand Up @@ -36,9 +36,14 @@ class MetricRegistryConstants {
private static final LabelValue UNSET_LABEL = LabelValue.create(null);

static final LabelValue NUM_IN_USE_SESSIONS = LabelValue.create("num_in_use_sessions");

@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we add javadocs on the deprecation notice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

static final LabelValue NUM_SESSIONS_BEING_PREPARED =
LabelValue.create("num_sessions_being_prepared");

static final LabelValue NUM_READ_SESSIONS = LabelValue.create("num_read_sessions");

@Deprecated
static final LabelValue NUM_WRITE_SESSIONS = LabelValue.create("num_write_prepared_sessions");

static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =
Expand Down
Expand Up @@ -228,34 +228,24 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
@Override
public TransactionRunner readWriteTransaction() {
return setActive(
new TransactionRunnerImpl(
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction()));
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
}

@Override
public AsyncRunner runAsync() {
return new AsyncRunnerImpl(
setActive(
new TransactionRunnerImpl(
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction())));
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())));
}

@Override
public TransactionManager transactionManager() {
return new TransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
return new TransactionManagerImpl(this, currentSpan);
}

@Override
public AsyncTransactionManagerImpl transactionManagerAsync() {
return new AsyncTransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
return new AsyncTransactionManagerImpl(this, currentSpan);
}

@Override
Expand Down