Skip to content

Commit

Permalink
feat: log bulk mutation entry errors (#3198)
Browse files Browse the repository at this point in the history
* feat: log bulk mutation entry errors

Currently only RPC level errors are logged for BulkMutations. This adds logging for individual entries.

* fix null msg handling
  • Loading branch information
igorbernstein2 committed Aug 31, 2021
1 parent e294c1e commit 0618ddb
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
Expand Up @@ -21,10 +21,13 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.common.base.MoreObjects;
import com.google.rpc.Status;
import io.grpc.Status.Code;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* Performs retries for {@link BigtableDataClient#mutateRows(MutateRowsRequest)} operations.
Expand Down Expand Up @@ -194,4 +197,51 @@ public MutateRowsResponse buildResponse() {
}
return MutateRowsResponse.newBuilder().addAllEntries(entries).build();
}

public String getResultString() {
// 2D map: code -> msg -> count
Map<Integer, Map<String, Long>> resultCounts = new TreeMap<>();

for (Status result : results) {
if (result == null) {
result = STATUS_INTERNAL;
}
Map<String, Long> msgCounts = resultCounts.get(result.getCode());
if (msgCounts == null) {
msgCounts = new TreeMap<>();
resultCounts.put(result.getCode(), msgCounts);
}

String msg = MoreObjects.firstNonNull(result.getMessage(), "");
Long count = MoreObjects.firstNonNull(msgCounts.get(msg), 0L);
msgCounts.put(msg, count + 1);
}

// Format string as: code: msg(count), msg2(count); code2: msg(count);
StringBuilder buffer = new StringBuilder();
boolean isFirstCode = true;
for (Map.Entry<Integer, Map<String, Long>> codeEntry : resultCounts.entrySet()) {
if (!isFirstCode) {
buffer.append("; ");
} else {
isFirstCode = false;
}

buffer.append(io.grpc.Status.fromCodeValue(codeEntry.getKey()).getCode());
buffer.append(": ");

boolean isFirstMsg = true;
for (Map.Entry<String, Long> msgEntry : codeEntry.getValue().entrySet()) {
if (!isFirstMsg) {
buffer.append(", ");
} else {
isFirstMsg = false;
}
buffer.append(msgEntry.getKey());
buffer.append("(" + msgEntry.getValue() + ")");
}
}

return buffer.toString();
}
}
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.cloud.bigtable.config.Logger;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.cloud.bigtable.grpc.DeadlineGenerator;
Expand All @@ -39,6 +40,8 @@
public class RetryingMutateRowsOperation
extends AbstractRetryingOperation<
MutateRowsRequest, MutateRowsResponse, List<MutateRowsResponse>> {
protected static final Logger LOG = new Logger(RetryingMutateRowsOperation.class);

private static final io.grpc.Status INVALID_RESPONSE =
io.grpc.Status.INTERNAL.withDescription("The server returned an invalid response");

Expand Down Expand Up @@ -86,6 +89,8 @@ protected boolean onOK(Metadata trailers) {
ProcessingStatus status = requestManager.onOK();

if (status == ProcessingStatus.INVALID) {
LOG.error("BulkMutateRows was invalid, final state: " + requestManager.getResultString());

// Set an exception.
onError(INVALID_RESPONSE, trailers);
return true;
Expand All @@ -95,6 +100,12 @@ protected boolean onOK(Metadata trailers) {
if (status == ProcessingStatus.SUCCESS || status == ProcessingStatus.NOT_RETRYABLE) {
// Set the response, with either success, or non-retryable responses.
completionFuture.set(Arrays.asList(requestManager.buildResponse()));

if (status != ProcessingStatus.SUCCESS) {
LOG.error(
"BulkMutateRows partially failed with nonretryable errors, final state: "
+ requestManager.getResultString());
}
return true;
}

Expand All @@ -110,6 +121,9 @@ protected boolean onOK(Metadata trailers) {
"failureCount",
AttributeValue.longAttributeValue(
requestManager.getRetryRequest().getEntriesCount())));

LOG.error(
"BulkMutateRows exhausted retries, final state: " + requestManager.getResultString());
return true;
}

Expand Down
Expand Up @@ -28,6 +28,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -227,4 +229,33 @@ public void testInvalid() {
new MutateRowsRequestManager(retryOptions, createRequest(3));
Assert.assertEquals(ProcessingStatus.INVALID, send(underTest, createResponse(OK, OK)));
}

@Test
public void testMessage() {
MutateRowsRequestManager underTest =
new MutateRowsRequestManager(retryOptions, createRequest(4));
send(
underTest,
createResponse(
Status.newBuilder().setCode(io.grpc.Status.Code.OK.value()).build(),
Status.newBuilder()
.setCode(io.grpc.Status.Code.DEADLINE_EXCEEDED.value())
.setMessage("deadline exceeded custom message")
.build(),
Status.newBuilder()
.setCode(io.grpc.Status.Code.DEADLINE_EXCEEDED.value())
.setMessage("deadline exceeded custom message")
.build(),
Status.newBuilder()
.setCode(io.grpc.Status.Code.INVALID_ARGUMENT.value())
.setMessage("invalid arg custom message")
.build()));
String resultString = underTest.getResultString();
MatcherAssert.assertThat(resultString, Matchers.containsString("OK: (1)"));
MatcherAssert.assertThat(
resultString,
Matchers.containsString("DEADLINE_EXCEEDED: deadline exceeded custom message(2)"));
MatcherAssert.assertThat(
resultString, Matchers.containsString("INVALID_ARGUMENT: invalid arg custom message(1)"));
}
}

0 comments on commit 0618ddb

Please sign in to comment.