Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: adds options to the write operations (#531)
* feat: adds options to the write operations

Adds the possibility of adding options to the write operations and
encapsulates the write response into it's own class so that we can
augment the response with more fields than the commit timestamp.
  • Loading branch information
thiagotnunes committed Oct 22, 2020
1 parent 5322c95 commit 659719d
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 3 deletions.
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -378,6 +378,18 @@
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>

<!-- Write with options API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.CommitResponse writeWithOptions(java.lang.Iterable, com.google.cloud.spanner.Options$TransactionOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.CommitResponse writeAtLeastOnceWithOptions(java.lang.Iterable, com.google.cloud.spanner.Options$TransactionOption[])</method>
</difference>

<!-- Note: The following change for the LazySpannerInitializer.initialize() method must be specified twice, both with return type java.lang.Object and with com.google.cloud.spanner.Spanner. -->
<difference>
<differenceType>7009</differenceType>
Expand Down
@@ -0,0 +1,52 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import java.util.Objects;

/** Represents a response from a commit operation. */
public class CommitResponse {

private final Timestamp commitTimestamp;

public CommitResponse(Timestamp commitTimestamp) {
this.commitTimestamp = commitTimestamp;
}

/** Returns a {@link Timestamp} representing the commit time of the write operation. */
public Timestamp getCommitTimestamp() {
return commitTimestamp;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommitResponse that = (CommitResponse) o;
return Objects.equals(commitTimestamp, that.commitTimestamp);
}

@Override
public int hashCode() {
return Objects.hash(commitTimestamp);
}
}
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -52,6 +53,35 @@ public interface DatabaseClient {
*/
Timestamp write(Iterable<Mutation> mutations) throws SpannerException;

/**
* Writes the given mutations atomically to the database with the given options.
*
* <p>This method uses retries and replay protection internally, which means that the mutations
* are applied exactly once on success, or not at all if an error is returned, regardless of any
* failures in the underlying network. Note that if the call is cancelled or reaches deadline, it
* is not possible to know whether the mutations were applied without performing a subsequent
* database operation, but the mutations will have been applied at most once.
*
* <p>Example of blind write.
*
* <pre>{@code
* long singerId = my_singer_id;
* Mutation mutation = Mutation.newInsertBuilder("Singer")
* .set("SingerId")
* .to(singerId)
* .set("FirstName")
* .to("Billy")
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeWithOptions(Collections.singletonList(mutation));
* }</pre>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException;

/**
* Writes the given mutations atomically to the database without replay protection.
*
Expand Down Expand Up @@ -83,6 +113,38 @@ public interface DatabaseClient {
*/
Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException;

/**
* Writes the given mutations atomically to the database without replay protection.
*
* <p>Since this method does not feature replay protection, it may attempt to apply {@code
* mutations} more than once; if the mutations are not idempotent, this may lead to a failure
* being reported when the mutation was applied once. For example, an insert may fail with {@link
* ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method was called. For
* this reason, most users of the library will prefer to use {@link #write(Iterable)} instead.
* However, {@code writeAtLeastOnce()} requires only a single RPC, whereas {@code write()}
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* <p>Example of unprotected blind write.
*
* <pre>{@code
* long singerId = my_singer_id;
* Mutation mutation = Mutation.newInsertBuilder("Singers")
* .set("SingerId")
* .to(singerId)
* .set("FirstName")
* .to("Billy")
* .set("LastName")
* .to("Joel")
* .build();
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
* }</pre>
*
* @return a response with the timestamp at which the write was committed
*/
CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;

/**
* Returns a context in which a single read can be performed using {@link TimestampBound#strong()}
* concurrency. This method will return a {@link ReadContext} that will not return the read
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -81,6 +82,13 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
Expand All @@ -101,6 +109,13 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
Expand Down
Expand Up @@ -33,6 +33,9 @@ public interface ReadOption {}
/** Marker interface to mark options applicable to query operation. */
public interface QueryOption {}

/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -35,7 +36,6 @@
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import io.opencensus.common.Scope;
Expand Down Expand Up @@ -139,6 +139,13 @@ public Void run(TransactionContext ctx) {
return runner.getCommitTimestamp();
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
setActive(null);
Expand All @@ -154,7 +161,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
.build();
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
CommitResponse response = spanner.getRpc().commit(request, options);
com.google.spanner.v1.CommitResponse response = spanner.getRpc().commit(request, options);
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
return t;
} catch (IllegalArgumentException e) {
Expand All @@ -168,6 +175,13 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
return singleUse(TimestampBound.strong());
Expand Down
Expand Up @@ -47,10 +47,10 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1103,6 +1103,13 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
}
}

@Override
public CommitResponse writeWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
try {
Expand All @@ -1112,6 +1119,13 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
try {
Expand Down Expand Up @@ -1347,6 +1361,13 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
}
}

@Override
public CommitResponse writeWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
try {
Expand All @@ -1357,6 +1378,13 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public long executePartitionedUpdate(Statement stmt) throws SpannerException {
try {
Expand Down

0 comments on commit 659719d

Please sign in to comment.