Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds options to the write operations #531

Merged
merged 3 commits into from Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -377,4 +377,17 @@
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>

<!-- Write with options API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.CommitResponse writeWithOptions(java.lang.Iterable, com.google.cloud.spanner.Options$TransactionOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.CommitResponse writeAtLeastOnceWithOptions(java.lang.Iterable, com.google.cloud.spanner.Options$TransactionOption[])</method>
</difference>

</differences>
@@ -0,0 +1,52 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import java.util.Objects;

/** Represents a response from a commit operation. */
public class CommitResponse {

private final Timestamp commitTimestamp;

public CommitResponse(Timestamp commitTimestamp) {
this.commitTimestamp = commitTimestamp;
}

/** Returns a {@link Timestamp} representing the commit time of the write operation. */
public Timestamp getCommitTimestamp() {
return commitTimestamp;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommitResponse that = (CommitResponse) o;
return Objects.equals(commitTimestamp, that.commitTimestamp);
}

@Override
public int hashCode() {
return Objects.hash(commitTimestamp);
}
}
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -52,6 +53,35 @@ public interface DatabaseClient {
*/
Timestamp write(Iterable<Mutation> mutations) throws SpannerException;

/**
* Writes the given mutations atomically to the database with the given options.
*
* <p>This method uses retries and replay protection internally, which means that the mutations
* are applied exactly once on success, or not at all if an error is returned, regardless of any
* failures in the underlying network. Note that if the call is cancelled or reaches deadline, it
* is not possible to know whether the mutations were applied without performing a subsequent
* database operation, but the mutations will have been applied at most once.
*
* <p>Example of blind write.
*
* <pre>{@code
* long singerId = my_singer_id;
* Mutation mutation = Mutation.newInsertBuilder("Singer")
* .set("SingerId")
* .to(singerId)
* .set("FirstName")
* .to("Billy")
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeWithOptions(Collections.singletonList(mutation));
* }</pre>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about WriteOption in place of TransactionOption?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thiago and I had a short discussion about this, and we think TransactionOption is better in this case, as we intend to also use this for options that we will need for normal read/write transactions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sgtm

throws SpannerException;

/**
* Writes the given mutations atomically to the database without replay protection.
*
Expand Down Expand Up @@ -83,6 +113,38 @@ public interface DatabaseClient {
*/
Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException;

/**
* Writes the given mutations atomically to the database without replay protection.
*
* <p>Since this method does not feature replay protection, it may attempt to apply {@code
* mutations} more than once; if the mutations are not idempotent, this may lead to a failure
* being reported when the mutation was applied once. For example, an insert may fail with {@link
* ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method was called. For
* this reason, most users of the library will prefer to use {@link #write(Iterable)} instead.
* However, {@code writeAtLeastOnce()} requires only a single RPC, whereas {@code write()}
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* <p>Example of unprotected blind write.
*
* <pre>{@code
* long singerId = my_singer_id;
* Mutation mutation = Mutation.newInsertBuilder("Singers")
* .set("SingerId")
* .to(singerId)
* .set("FirstName")
* .to("Billy")
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
* }</pre>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;

/**
* Returns a context in which a single read can be performed using {@link TimestampBound#strong()}
* concurrency. This method will return a {@link ReadContext} that will not return the read
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -81,6 +82,13 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
Expand All @@ -101,6 +109,13 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
Expand Down
Expand Up @@ -33,6 +33,9 @@ public interface ReadOption {}
/** Marker interface to mark options applicable to query operation. */
public interface QueryOption {}

/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -35,7 +36,6 @@
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import io.opencensus.common.Scope;
Expand Down Expand Up @@ -139,6 +139,13 @@ public Void run(TransactionContext ctx) {
return runner.getCommitTimestamp();
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
setActive(null);
Expand All @@ -154,7 +161,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
.build();
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
CommitResponse response = spanner.getRpc().commit(request, options);
com.google.spanner.v1.CommitResponse response = spanner.getRpc().commit(request, options);
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
return t;
} catch (IllegalArgumentException e) {
Expand All @@ -168,6 +175,13 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
return singleUse(TimestampBound.strong());
Expand Down
Expand Up @@ -47,10 +47,10 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1103,6 +1103,13 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
}
}

@Override
public CommitResponse writeWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
try {
Expand All @@ -1112,6 +1119,13 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
try {
Expand Down Expand Up @@ -1347,6 +1361,13 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
}
}

@Override
public CommitResponse writeWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
try {
Expand All @@ -1357,6 +1378,13 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public long executePartitionedUpdate(Statement stmt) throws SpannerException {
try {
Expand Down