Skip to content

Commit

Permalink
feat!: remove session pool preparing (#515)
Browse files Browse the repository at this point in the history
* feat: remove session pool preparing

* fix: fix integration tests

* test: fix malformed retry loop in test case

* fix: review comments
  • Loading branch information
olavloite committed Oct 21, 2020
1 parent 07346f0 commit 2768f69
Show file tree
Hide file tree
Showing 34 changed files with 543 additions and 2,062 deletions.
Expand Up @@ -39,16 +39,14 @@ final class AsyncTransactionManagerImpl

private final SessionImpl session;
private Span span;
private final boolean inlineBegin;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span, boolean inlineBegin) {
AsyncTransactionManagerImpl(SessionImpl session, Span span) {
this.session = session;
this.span = span;
this.inlineBegin = inlineBegin;
}

@Override
Expand All @@ -69,15 +67,15 @@ public TransactionContextFutureImpl beginAsync() {
return begin;
}

private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
if (setActive) {
if (firstAttempt) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut;
if (inlineBegin) {
if (firstAttempt) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
Expand Down
Expand Up @@ -29,48 +29,33 @@

class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN =
"CloudSpanner.ReadWriteTransactionWithInlineBegin";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();

private enum SessionMode {
READ,
READ_WRITE
}

@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
private final boolean inlineBeginReadWriteTransactions;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, boolean inlineBeginReadWriteTransactions) {
this("", pool, inlineBeginReadWriteTransactions);
DatabaseClientImpl(SessionPool pool) {
this("", pool);
}

DatabaseClientImpl(String clientId, SessionPool pool, boolean inlineBeginReadWriteTransactions) {
DatabaseClientImpl(String clientId, SessionPool pool) {
this.clientId = clientId;
this.pool = pool;
this.inlineBeginReadWriteTransactions = inlineBeginReadWriteTransactions;
}

@VisibleForTesting
PooledSessionFuture getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSessionFuture getReadWriteSession() {
return pool.getReadWriteSession();
PooledSessionFuture getSession() {
return pool.getSession();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -90,7 +75,6 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -109,7 +93,7 @@ public Timestamp apply(Session session) {
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse();
return getSession().singleUse();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -120,7 +104,7 @@ public ReadContext singleUse() {
public ReadContext singleUse(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse(bound);
return getSession().singleUse(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -131,7 +115,7 @@ public ReadContext singleUse(TimestampBound bound) {
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction();
return getSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -142,7 +126,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction(bound);
return getSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -153,7 +137,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction();
return getSession().readOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -164,7 +148,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction(bound);
return getSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -173,56 +157,22 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction() {
return inlineBeginReadWriteTransactions
? inlinedReadWriteTransaction()
: preparedReadWriteTransaction();
}

private TransactionRunner preparedReadWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().readWriteTransaction();
return getSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}

private TransactionRunner inlinedReadWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
// An inlined read/write transaction does not need a write-prepared session.
return getReadSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public TransactionManager transactionManager() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManager()
: preparedTransactionManager();
}

private TransactionManager preparedTransactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private TransactionManager inlinedTransactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
// An inlined read/write transaction does not need a write-prepared session.
return getReadSession().transactionManager();
return getSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -231,23 +181,9 @@ private TransactionManager inlinedTransactionManager() {

@Override
public AsyncRunner runAsync() {
return inlineBeginReadWriteTransactions ? inlinedRunAsync() : preparedRunAsync();
}

private AsyncRunner preparedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private AsyncRunner inlinedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().runAsync();
return getSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -256,25 +192,9 @@ private AsyncRunner inlinedRunAsync() {

@Override
public AsyncTransactionManager transactionManagerAsync() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManagerAsync()
: preparedTransactionManagerAsync();
}

private AsyncTransactionManager preparedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

private AsyncTransactionManager inlinedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().transactionManagerAsync();
return getSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -285,10 +205,7 @@ private AsyncTransactionManager inlinedTransactionManagerAsync() {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
// A partitioned update transaction does not need a prepared write session, as the transaction
// object will start a new transaction with specific options anyway.
return runWithSessionRetry(
SessionMode.READ,
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
Expand All @@ -301,17 +218,13 @@ public Long apply(Session session) {
}
}

private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callable) {
PooledSessionFuture session =
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSessionFuture session = getSession();
while (true) {
try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session =
mode == SessionMode.READ_WRITE
? pool.replaceReadWriteSession(e, session)
: pool.replaceReadSession(e, session);
session = pool.replaceSession(e, session);
}
}
}
Expand Down
Expand Up @@ -36,9 +36,22 @@ class MetricRegistryConstants {
private static final LabelValue UNSET_LABEL = LabelValue.create(null);

static final LabelValue NUM_IN_USE_SESSIONS = LabelValue.create("num_in_use_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_SESSIONS_BEING_PREPARED =
LabelValue.create("num_sessions_being_prepared");

static final LabelValue NUM_READ_SESSIONS = LabelValue.create("num_read_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_WRITE_SESSIONS = LabelValue.create("num_write_prepared_sessions");

static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =
Expand Down
Expand Up @@ -228,34 +228,24 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
@Override
public TransactionRunner readWriteTransaction() {
return setActive(
new TransactionRunnerImpl(
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction()));
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
}

@Override
public AsyncRunner runAsync() {
return new AsyncRunnerImpl(
setActive(
new TransactionRunnerImpl(
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction())));
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())));
}

@Override
public TransactionManager transactionManager() {
return new TransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
return new TransactionManagerImpl(this, currentSpan);
}

@Override
public AsyncTransactionManagerImpl transactionManagerAsync() {
return new AsyncTransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
return new AsyncTransactionManagerImpl(this, currentSpan);
}

@Override
Expand Down

0 comments on commit 2768f69

Please sign in to comment.