Skip to content

Commit

Permalink
fix: bulkWriter: writing to the same doc doesn't create a new batch (#…
Browse files Browse the repository at this point in the history
…394)

* fix: bulkWriter: writing to the same doc doesn't create a new batch

* fix clirr rules

* resolve comments

* remove usage of BatchWriteResult.documentKey

* clirr rule update
  • Loading branch information
Brian Chen committed Oct 1, 2020
1 parent 5e8c154 commit 259ece8
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 198 deletions.
13 changes: 12 additions & 1 deletion google-cloud-firestore/clirr-ignored-differences.xml
Expand Up @@ -161,7 +161,7 @@
<method>com.google.cloud.firestore.Query collectionGroup(java.lang.String)</method>
<to>com.google.cloud.firestore.CollectionGroup</to>
</difference>

<!--
BulkWriter
-->
Expand Down Expand Up @@ -199,6 +199,17 @@
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>com.google.api.gax.rpc.UnaryCallable batchWriteCallable()</method>
</difference>
<difference>
<differenceType>1001</differenceType>
<className>com/google/cloud/firestore/BatchWriteResult</className>
</difference>
<difference>
<differenceType>6004</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<field>pendingOperations</field>
<from>java.util.Map</from>
<to>java.util.List</to>
</difference>

<!--
FakeCredentials Refactor
Expand Down
Expand Up @@ -25,24 +25,15 @@
* BatchWriteRequests.
*/
@InternalApi
public final class BatchWriteResult {
private final DocumentReference documentReference;
final class BatchWriteResult {
@Nullable private final Timestamp writeTime;
@Nullable private final Exception exception;

BatchWriteResult(
DocumentReference documentReference,
@Nullable Timestamp timestamp,
@Nullable Exception exception) {
this.documentReference = documentReference;
BatchWriteResult(@Nullable Timestamp timestamp, @Nullable Exception exception) {
this.writeTime = timestamp;
this.exception = exception;
}

public DocumentReference getDocumentReference() {
return documentReference;
}

@Nullable
public Timestamp getWriteTime() {
return writeTime;
Expand Down
Expand Up @@ -18,9 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import java.util.Set;

/** Used to represent a batch on the BatchQueue. */
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
Expand All @@ -29,21 +26,13 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
super(firestore, maxBatchSize);
}

BulkCommitBatch(
FirestoreImpl firestore,
BulkCommitBatch retryBatch,
final Set<DocumentReference> docsToRetry) {
BulkCommitBatch(FirestoreImpl firestore, BulkCommitBatch retryBatch) {
super(firestore);
this.writes.addAll(
FluentIterable.from(retryBatch.writes)
.filter(
new Predicate<WriteOperation>() {
@Override
public boolean apply(WriteOperation writeOperation) {
return docsToRetry.contains(writeOperation.documentReference);
}
})
.toList());

// Create a new BulkCommitBatch containing only the indexes from the provided indexes to retry.
for (int index : retryBatch.getPendingIndexes()) {
this.writes.add(retryBatch.writes.get(index));
}

Preconditions.checkState(
retryBatch.state == BatchState.SENT,
Expand All @@ -55,9 +44,4 @@ public boolean apply(WriteOperation writeOperation) {
ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
return result;
}

@Override
boolean allowDuplicateDocs() {
return false;
}
}
Expand Up @@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -129,7 +128,7 @@ final class BulkWriter implements AutoCloseable {
public ApiFuture<WriteResult> create(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.create(documentReference, fields);
sendReadyBatches();
return future;
Expand All @@ -147,7 +146,7 @@ public ApiFuture<WriteResult> create(
public ApiFuture<WriteResult> create(
@Nonnull DocumentReference documentReference, @Nonnull Object pojo) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.create(documentReference, pojo);
sendReadyBatches();
return future;
Expand All @@ -163,7 +162,7 @@ public ApiFuture<WriteResult> create(
@Nonnull
public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReference) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.delete(documentReference);
sendReadyBatches();
return future;
Expand All @@ -181,7 +180,7 @@ public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReferenc
public ApiFuture<WriteResult> delete(
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.delete(documentReference, precondition);
sendReadyBatches();
return future;
Expand All @@ -200,7 +199,7 @@ public ApiFuture<WriteResult> delete(
public ApiFuture<WriteResult> set(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, fields);
sendReadyBatches();
return future;
Expand All @@ -222,7 +221,7 @@ public ApiFuture<WriteResult> set(
@Nonnull Map<String, Object> fields,
@Nonnull SetOptions options) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, fields, options);
sendReadyBatches();
return future;
Expand All @@ -242,7 +241,7 @@ public ApiFuture<WriteResult> set(
public ApiFuture<WriteResult> set(
@Nonnull DocumentReference documentReference, Object pojo, @Nonnull SetOptions options) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, pojo, options);
sendReadyBatches();
return future;
Expand All @@ -259,7 +258,7 @@ public ApiFuture<WriteResult> set(
@Nonnull
public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference, Object pojo) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, pojo);
sendReadyBatches();
return future;
Expand All @@ -282,7 +281,7 @@ public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference,
public ApiFuture<WriteResult> update(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.update(documentReference, fields);
sendReadyBatches();
return future;
Expand All @@ -308,7 +307,7 @@ public ApiFuture<WriteResult> update(
@Nonnull Map<String, Object> fields,
Precondition precondition) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.update(documentReference, fields, precondition);
sendReadyBatches();
return future;
Expand Down Expand Up @@ -336,7 +335,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(documentReference, field, value, moreFieldsAndValues);
sendReadyBatches();
Expand Down Expand Up @@ -365,7 +364,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(documentReference, fieldPath, value, moreFieldsAndValues);
sendReadyBatches();
Expand Down Expand Up @@ -395,7 +394,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(documentReference, precondition, field, value, moreFieldsAndValues);
sendReadyBatches();
Expand Down Expand Up @@ -426,7 +425,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(
documentReference, precondition, fieldPath, value, moreFieldsAndValues);
Expand Down Expand Up @@ -493,11 +492,10 @@ private void verifyNotClosed() {
* Return the first eligible batch that can hold a write to the provided reference, or creates one
* if no eligible batches are found.
*/
private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
private BulkCommitBatch getEligibleBatch() {
if (batchQueue.size() > 0) {
BulkCommitBatch lastBatch = batchQueue.get(batchQueue.size() - 1);
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN
&& !lastBatch.hasDocument(documentReference)) {
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN) {
return lastBatch;
}
}
Expand Down Expand Up @@ -538,7 +536,8 @@ public boolean apply(BulkCommitBatch batch) {
.toList();

int index = 0;
while (index < unsentBatches.size() && isBatchSendable(unsentBatches.get(index))) {
while (index < unsentBatches.size()
&& unsentBatches.get(index).state == BatchState.READY_TO_SEND) {
final BulkCommitBatch batch = unsentBatches.get(index);

// Send the batch if it is under the rate limit, or schedule another attempt after the
Expand Down Expand Up @@ -631,8 +630,8 @@ public ApiFuture<Void> apply(Void ignored) {
public ApiFuture<List<BatchWriteResult>> apply(Exception exception) {
List<BatchWriteResult> results = new ArrayList<>();
// If the BatchWrite RPC fails, map the exception to each individual result.
for (DocumentReference documentReference : batch.getPendingDocuments()) {
results.add(new BatchWriteResult(documentReference, null, exception));
for (int i = 0; i < batch.getPendingDocumentPaths().size(); ++i) {
results.add(new BatchWriteResult(null, exception));
}
return ApiFutures.immediateFuture(results);
}
Expand All @@ -655,8 +654,8 @@ public ProcessBulkCommitCallback(BulkCommitBatch batch, int attempt) {

@Override
public ApiFuture<Void> apply(List<BatchWriteResult> results) {
batch.processResults(results);
Set<DocumentReference> remainingOps = batch.getPendingDocuments();
batch.processResults(results, /* allowRetry= */ true);
List<String> remainingOps = batch.getPendingDocumentPaths();
if (!remainingOps.isEmpty()) {
logger.log(
Level.WARNING,
Expand All @@ -666,52 +665,16 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {

if (attempt < MAX_RETRY_ATTEMPTS) {
nextAttempt = backoff.createNextAttempt(nextAttempt);
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch, remainingOps);
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch);
return bulkCommit(newBatch, attempt + 1);
} else {
batch.failRemainingOperations(results);
batch.processResults(results, /* allowRetry= */ false);
}
}
return ApiFutures.immediateFuture(null);
}
}

/**
* Checks that the provided batch is sendable. To be sendable, a batch must: (1) be marked as
* READY_TO_SEND (2) not write to references that are currently in flight.
*/
private boolean isBatchSendable(BulkCommitBatch batch) {
if (!batch.getState().equals(UpdateBuilder.BatchState.READY_TO_SEND)) {
return false;
}

for (final DocumentReference documentReference : batch.getPendingDocuments()) {
boolean isRefInFlight =
FluentIterable.from(batchQueue)
.anyMatch(
new Predicate<BulkCommitBatch>() {
@Override
public boolean apply(BulkCommitBatch batch) {
return batch.getState().equals(BatchState.SENT)
&& batch.hasDocument(documentReference);
}
});

if (isRefInFlight) {
logger.log(
Level.WARNING,
String.format(
"Duplicate write to document %s detected. Writing to the same document multiple"
+ " times will slow down BulkWriter. Write to unique documents in order to "
+ "maximize throughput.",
documentReference.getPath()));
return false;
}
}

return true;
}

@VisibleForTesting
void setMaxBatchSize(int size) {
maxBatchSize = size;
Expand Down

0 comments on commit 259ece8

Please sign in to comment.