Skip to content

Commit

Permalink
feat: change response type of write instead of overload
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Oct 19, 2020
1 parent 1fdf282 commit c9e707b
Show file tree
Hide file tree
Showing 32 changed files with 323 additions and 437 deletions.
Expand Up @@ -57,17 +57,10 @@ interface AsyncWork<R> {
*/
ApiFuture<Timestamp> getCommitTimestamp();

/**
* Indicates that the {@link AsyncRunner} should request the backend to return {@link
* CommitStats}. The {@link CommitStats} can be retrieved by calling {@link #getCommitStats()}
* after the transaction has successfully committed.
*/
AsyncRunner withCommitStats();

/**
* Returns the {@link CommitStats} of this transaction. This method may only be called after the
* transaction has successfully committed, and only if {@link #withCommitStats()} was called
* before executing the transaction.
* transaction has successfully committed, and only if {@link Options#commitStats()} was specified
* for the transaction.
*/
ApiFuture<CommitStats> getCommitStats();
}
Expand Up @@ -26,12 +26,13 @@

class AsyncRunnerImpl implements AsyncRunner {
private final TransactionRunnerImpl delegate;
private final Options options;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private final SettableApiFuture<CommitStats> commitStats = SettableApiFuture.create();
private boolean returnCommitStats;

AsyncRunnerImpl(TransactionRunnerImpl delegate) {
this.delegate = delegate;
AsyncRunnerImpl(TransactionRunnerImpl delegate, Options options) {
this.delegate = Preconditions.checkNotNull(delegate);
this.options = Preconditions.checkNotNull(options);
}

@Override
Expand Down Expand Up @@ -75,7 +76,7 @@ private void setCommitTimestampAndStats() {
} catch (Throwable t) {
commitTimestamp.setException(t);
}
if (returnCommitStats) {
if (options.withCommitStats()) {
try {
commitStats.set(delegate.getCommitStats());
} catch (Throwable t) {
Expand All @@ -89,18 +90,11 @@ public ApiFuture<Timestamp> getCommitTimestamp() {
return commitTimestamp;
}

@Override
public AsyncRunner withCommitStats() {
delegate.withCommitStats();
returnCommitStats = true;
return this;
}

@Override
public ApiFuture<CommitStats> getCommitStats() {
Preconditions.checkState(
returnCommitStats,
"getCommitStats may only be invoked if withCommitStats has been invoked before executing the transaction");
options.withCommitStats(),
"getCommitStats may only be invoked if Options.commitStats() was specified for the transaction");
return commitStats;
}
}
Expand Up @@ -191,17 +191,10 @@ public interface AsyncTransactionFunction<I, O> {
/** Returns the state of the transaction. */
TransactionState getState();

/**
* Indicates that the {@link AsyncTransactionManager} should request the backend to return {@link
* CommitStats}. The {@link CommitStats} can be retrieved by calling {@link #getCommitStats()}
* after the transaction has successfully committed.
*/
AsyncTransactionManager withCommitStats();

/**
* Returns the {@link CommitStats} of this transaction. This method may only be called after the
* transaction has successfully committed, and only if {@link #withCommitStats()} was called
* before committing the transaction.
* transaction has successfully committed, and only if {@link Options#commitStats()} was specified
* for the transaction.
*/
ApiFuture<CommitStats> getCommitStats();

Expand Down
Expand Up @@ -37,16 +37,19 @@ final class AsyncTransactionManagerImpl
private static final Tracer tracer = Tracing.getTracer();

private final SessionImpl session;
private final Options options;
private Span span;
private boolean returnCommitStats;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private SettableApiFuture<CommitStats> commitStats;

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, Options options) {
Preconditions.checkNotNull(session);
Preconditions.checkNotNull(options);
this.session = session;
this.options = options;
this.span = span;
}

Expand All @@ -55,20 +58,14 @@ public void setSpan(Span span) {
this.span = span;
}

@Override
public AsyncTransactionManager withCommitStats() {
this.returnCommitStats = true;
return this;
}

@Override
public ApiFuture<CommitStats> getCommitStats() {
Preconditions.checkState(
txnState == TransactionState.COMMITTED,
"getCommitStats can only be invoked if the transaction committed successfully");
Preconditions.checkState(
returnCommitStats,
"getCommitStats can only be invoked if withCommitStats() was invoked before committing the transaction");
options.withCommitStats(),
"getCommitStats can only be invoked if Options.commitStats() was specified for the transaction");
return commitStats;
}

Expand Down Expand Up @@ -138,8 +135,8 @@ public ApiFuture<Timestamp> commitAsync() {
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Transaction already aborted"));
}
ApiFuture<Timestamp> res = txn.commitAsync(returnCommitStats);
if (returnCommitStats) {
ApiFuture<Timestamp> res = txn.commitAsync(options.withCommitStats());
if (options.withCommitStats()) {
commitStats = SettableApiFuture.create();
}
txnState = TransactionState.COMMITTED;
Expand Down
Expand Up @@ -17,13 +17,29 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
* instance of this is tied to a specific database.
*/
public interface DatabaseClient {

/**
* Response for write methods that return both a commit {@link Timestamp} and {@link CommitStats}.
*/
interface WriteResponse {
/** The commit timestamp of the transaction. */
Timestamp getCommitTimestamp();

/**
* The commit statistics of the transaction (if requested).
*
* @throws IllegalStateException if no {@link CommitStats} were requested for the transaction.
*/
CommitStats getCommitStats();
}

/**
* Writes the given mutations atomically to the database.
*
Expand All @@ -33,7 +49,7 @@ public interface DatabaseClient {
* is not possible to know whether the mutations were applied without performing a subsequent
* database operation, but the mutations will have been applied at most once.
*
* <p>Example of blind write.
* <p>Example of blind write that returns {@link CommitStats}.
*
* <pre>{@code
* long singerId = my_singer_id;
Expand All @@ -45,12 +61,14 @@ public interface DatabaseClient {
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.write(Collections.singletonList(mutation));
* dbClient.write(Collections.singletonList(mutation), Options.commitStats());
* }</pre>
*
* @return the timestamp at which the write was committed
* @return the timestamp at which the write was committed and the {@link CommitStats} if those
* were requested.
*/
Timestamp write(Iterable<Mutation> mutations) throws SpannerException;
WriteResponse write(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException;

/**
* Writes the given mutations atomically to the database without replay protection.
Expand All @@ -64,7 +82,7 @@ public interface DatabaseClient {
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* <p>Example of unprotected blind write.
* <p>Example of unprotected blind write that returns {@link CommitStats}.
*
* <pre>{@code
* long singerId = my_singer_id;
Expand All @@ -76,35 +94,13 @@ public interface DatabaseClient {
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation), Options.commitStats());
* }</pre>
*
* @return the timestamp at which the write was committed
*/
Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException;

/**
* Response for write methods that return both a commit {@link Timestamp} and {@link CommitStats}.
*/
interface WriteResponse {
/** The commit timestamp of the transaction. */
Timestamp getCommitTimestamp();

/** The commit statistics of the transaction. */
CommitStats getCommitStats();
}

/**
* Same as {@link #write(Iterable)}, but requests the backend to return both a commit timestamp
* and {@link CommitStats}.
*/
WriteResponse writeWithCommitStats(Iterable<Mutation> mutations) throws SpannerException;

/**
* Same as {@link #writeAtLeastOnce(Iterable)}, but requests the backend to return both a commit
* timestamp and {@link CommitStats}.
* @return the timestamp at which the write was committed and the {@link CommitStats} if those
* were requested.
*/
WriteResponse writeAtLeastOnceWithCommitStats(Iterable<Mutation> mutations)
WriteResponse writeAtLeastOnce(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException;

/**
Expand Down Expand Up @@ -270,7 +266,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable<Mutation> mutations)
* });
* </code></pre>
*/
TransactionRunner readWriteTransaction();
TransactionRunner readWriteTransaction(TransactionOption... options);

/**
* Returns a transaction manager which allows manual management of transaction lifecycle. This API
Expand Down Expand Up @@ -300,7 +296,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable<Mutation> mutations)
* }
* }</pre>
*/
TransactionManager transactionManager();
TransactionManager transactionManager(TransactionOption... options);

/**
* Returns an asynchronous transaction runner for executing a single logical transaction with
Expand Down Expand Up @@ -333,7 +329,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable<Mutation> mutations)
* executor);
* </code></pre>
*/
AsyncRunner runAsync();
AsyncRunner runAsync(TransactionOption... options);

/**
* Returns an asynchronous transaction manager which allows manual management of transaction
Expand Down Expand Up @@ -421,7 +417,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable<Mutation> mutations)
* }
* }</pre>
*/
AsyncTransactionManager transactionManagerAsync();
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

/**
* Returns the lower bound of rows modified by this DML statement.
Expand Down

0 comments on commit c9e707b

Please sign in to comment.