Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bulkWriter: writing to the same doc doesn't create a new batch #394

Merged
merged 5 commits into from Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion google-cloud-firestore/clirr-ignored-differences.xml
Expand Up @@ -161,7 +161,7 @@
<method>com.google.cloud.firestore.Query collectionGroup(java.lang.String)</method>
<to>com.google.cloud.firestore.CollectionGroup</to>
</difference>

<!--
BulkWriter
-->
Expand Down Expand Up @@ -199,6 +199,17 @@
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>com.google.api.gax.rpc.UnaryCallable batchWriteCallable()</method>
</difference>
<difference>
<differenceType>1001</differenceType>
<className>com/google/cloud/firestore/BatchWriteResult</className>
</difference>
<difference>
<differenceType>6004</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<field>pendingOperations</field>
<from>java.util.Map</from>
<to>java.util.List</to>
</difference>

<!--
FakeCredentials Refactor
Expand Down
Expand Up @@ -25,24 +25,15 @@
* BatchWriteRequests.
*/
@InternalApi
public final class BatchWriteResult {
private final DocumentReference documentReference;
final class BatchWriteResult {
@Nullable private final Timestamp writeTime;
@Nullable private final Exception exception;

BatchWriteResult(
DocumentReference documentReference,
@Nullable Timestamp timestamp,
@Nullable Exception exception) {
this.documentReference = documentReference;
BatchWriteResult(@Nullable Timestamp timestamp, @Nullable Exception exception) {
this.writeTime = timestamp;
this.exception = exception;
}

public DocumentReference getDocumentReference() {
return documentReference;
}

@Nullable
public Timestamp getWriteTime() {
return writeTime;
Expand Down
Expand Up @@ -18,9 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import java.util.Set;

/** Used to represent a batch on the BatchQueue. */
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
Expand All @@ -29,21 +26,13 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
super(firestore, maxBatchSize);
}

BulkCommitBatch(
FirestoreImpl firestore,
BulkCommitBatch retryBatch,
final Set<DocumentReference> docsToRetry) {
BulkCommitBatch(FirestoreImpl firestore, BulkCommitBatch retryBatch) {
super(firestore);
this.writes.addAll(
FluentIterable.from(retryBatch.writes)
.filter(
new Predicate<WriteOperation>() {
@Override
public boolean apply(WriteOperation writeOperation) {
return docsToRetry.contains(writeOperation.documentReference);
}
})
.toList());

// Create a new BulkCommitBatch containing only the indexes from the provided indexes to retry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This is different from Node, but likely better. Should we update the Node SDK to match? This might make porting simpler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny how having more tools at your disposal does not translate to writing better code. Will create a small node PR for that later.

for (int index : retryBatch.getPendingIndexes()) {
this.writes.add(retryBatch.writes.get(index));
}

Preconditions.checkState(
retryBatch.state == BatchState.SENT,
Expand All @@ -55,9 +44,4 @@ public boolean apply(WriteOperation writeOperation) {
ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
return result;
}

@Override
boolean allowDuplicateDocs() {
return false;
}
}
Expand Up @@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -129,7 +128,7 @@ final class BulkWriter implements AutoCloseable {
public ApiFuture<WriteResult> create(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.create(documentReference, fields);
sendReadyBatches();
return future;
Expand All @@ -147,7 +146,7 @@ public ApiFuture<WriteResult> create(
public ApiFuture<WriteResult> create(
@Nonnull DocumentReference documentReference, @Nonnull Object pojo) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.create(documentReference, pojo);
sendReadyBatches();
return future;
Expand All @@ -163,7 +162,7 @@ public ApiFuture<WriteResult> create(
@Nonnull
public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReference) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.delete(documentReference);
sendReadyBatches();
return future;
Expand All @@ -181,7 +180,7 @@ public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReferenc
public ApiFuture<WriteResult> delete(
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.delete(documentReference, precondition);
sendReadyBatches();
return future;
Expand All @@ -200,7 +199,7 @@ public ApiFuture<WriteResult> delete(
public ApiFuture<WriteResult> set(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, fields);
sendReadyBatches();
return future;
Expand All @@ -222,7 +221,7 @@ public ApiFuture<WriteResult> set(
@Nonnull Map<String, Object> fields,
@Nonnull SetOptions options) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, fields, options);
sendReadyBatches();
return future;
Expand All @@ -242,7 +241,7 @@ public ApiFuture<WriteResult> set(
public ApiFuture<WriteResult> set(
@Nonnull DocumentReference documentReference, Object pojo, @Nonnull SetOptions options) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, pojo, options);
sendReadyBatches();
return future;
Expand All @@ -259,7 +258,7 @@ public ApiFuture<WriteResult> set(
@Nonnull
public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference, Object pojo) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, pojo);
sendReadyBatches();
return future;
Expand All @@ -282,7 +281,7 @@ public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference,
public ApiFuture<WriteResult> update(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.update(documentReference, fields);
sendReadyBatches();
return future;
Expand All @@ -308,7 +307,7 @@ public ApiFuture<WriteResult> update(
@Nonnull Map<String, Object> fields,
Precondition precondition) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future = bulkCommitBatch.update(documentReference, fields, precondition);
sendReadyBatches();
return future;
Expand Down Expand Up @@ -336,7 +335,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(documentReference, field, value, moreFieldsAndValues);
sendReadyBatches();
Expand Down Expand Up @@ -365,7 +364,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(documentReference, fieldPath, value, moreFieldsAndValues);
sendReadyBatches();
Expand Down Expand Up @@ -395,7 +394,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(documentReference, precondition, field, value, moreFieldsAndValues);
sendReadyBatches();
Expand Down Expand Up @@ -426,7 +425,7 @@ public ApiFuture<WriteResult> update(
@Nullable Object value,
Object... moreFieldsAndValues) {
verifyNotClosed();
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
ApiFuture<WriteResult> future =
bulkCommitBatch.update(
documentReference, precondition, fieldPath, value, moreFieldsAndValues);
Expand Down Expand Up @@ -493,11 +492,10 @@ private void verifyNotClosed() {
* Return the first eligible batch that can hold a write to the provided reference, or creates one
* if no eligible batches are found.
*/
private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
private BulkCommitBatch getEligibleBatch() {
if (batchQueue.size() > 0) {
BulkCommitBatch lastBatch = batchQueue.get(batchQueue.size() - 1);
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN
&& !lastBatch.hasDocument(documentReference)) {
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN) {
return lastBatch;
}
}
Expand Down Expand Up @@ -538,7 +536,8 @@ public boolean apply(BulkCommitBatch batch) {
.toList();

int index = 0;
while (index < unsentBatches.size() && isBatchSendable(unsentBatches.get(index))) {
while (index < unsentBatches.size()
&& unsentBatches.get(index).state == BatchState.READY_TO_SEND) {
final BulkCommitBatch batch = unsentBatches.get(index);

// Send the batch if it is under the rate limit, or schedule another attempt after the
Expand Down Expand Up @@ -631,8 +630,8 @@ public ApiFuture<Void> apply(Void ignored) {
public ApiFuture<List<BatchWriteResult>> apply(Exception exception) {
List<BatchWriteResult> results = new ArrayList<>();
// If the BatchWrite RPC fails, map the exception to each individual result.
for (DocumentReference documentReference : batch.getPendingDocuments()) {
results.add(new BatchWriteResult(documentReference, null, exception));
for (int i = 0; i < batch.getPendingDocumentPaths().size(); ++i) {
results.add(new BatchWriteResult(null, exception));
}
return ApiFutures.immediateFuture(results);
}
Expand All @@ -655,8 +654,8 @@ public ProcessBulkCommitCallback(BulkCommitBatch batch, int attempt) {

@Override
public ApiFuture<Void> apply(List<BatchWriteResult> results) {
batch.processResults(results);
Set<DocumentReference> remainingOps = batch.getPendingDocuments();
batch.processResults(results, /* allowRetry= */ true);
List<String> remainingOps = batch.getPendingDocumentPaths();
if (!remainingOps.isEmpty()) {
logger.log(
Level.WARNING,
Expand All @@ -666,52 +665,16 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {

if (attempt < MAX_RETRY_ATTEMPTS) {
nextAttempt = backoff.createNextAttempt(nextAttempt);
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch, remainingOps);
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch);
return bulkCommit(newBatch, attempt + 1);
} else {
batch.failRemainingOperations(results);
batch.processResults(results, /* allowRetry= */ false);
}
}
return ApiFutures.immediateFuture(null);
}
}

/**
* Checks that the provided batch is sendable. To be sendable, a batch must: (1) be marked as
* READY_TO_SEND (2) not write to references that are currently in flight.
*/
private boolean isBatchSendable(BulkCommitBatch batch) {
if (!batch.getState().equals(UpdateBuilder.BatchState.READY_TO_SEND)) {
return false;
}

for (final DocumentReference documentReference : batch.getPendingDocuments()) {
boolean isRefInFlight =
FluentIterable.from(batchQueue)
.anyMatch(
new Predicate<BulkCommitBatch>() {
@Override
public boolean apply(BulkCommitBatch batch) {
return batch.getState().equals(BatchState.SENT)
&& batch.hasDocument(documentReference);
}
});

if (isRefInFlight) {
logger.log(
Level.WARNING,
String.format(
"Duplicate write to document %s detected. Writing to the same document multiple"
+ " times will slow down BulkWriter. Write to unique documents in order to "
+ "maximize throughput.",
documentReference.getPath()));
return false;
}
}

return true;
}

@VisibleForTesting
void setMaxBatchSize(int size) {
maxBatchSize = size;
Expand Down