Skip to content

Commit

Permalink
fix: fix MutateRowsAttemptCallable to avoid NPE in MetricTracer (#557)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 17, 2020
1 parent 8240779 commit 8d71020
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
Expand Up @@ -160,9 +160,17 @@ public void setExternalFuture(RetryingFuture<Void> externalFuture) {
@Override
public Void call() {
try {
// externalFuture is set from MutateRowsRetryingCallable before invoking this method. It
// shouldn't be null unless the code changed
Preconditions.checkNotNull(
externalFuture, "External future must be set before starting an attempt");

// attemptStared should be called at the very start of the operation. This will initialize
// variables in ApiTracer and avoid exceptions when the tracer marks the attempt as finished
callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

Preconditions.checkState(
currentRequest.getEntriesCount() > 0, "Request doesn't have any mutations to send");

Expand All @@ -179,10 +187,6 @@ public Void call() {
return null;
}

callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

// Make the actual call
ApiFuture<List<MutateRowsResponse>> innerFuture =
innerCallable.futureCall(currentRequest, currentCallContext);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceHelper;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
Expand All @@ -46,6 +47,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -327,6 +329,24 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime));
}

@Test
public void testInvalidRequest() throws InterruptedException {
try {
stub.bulkMutateRowsCallable().call(BulkMutation.create(TABLE_ID));
Assert.fail("Invalid request should throw exception");
} catch (IllegalStateException e) {
Thread.sleep(100);
// Verify that the latency is recorded with an error code (in this case UNKNOWN)
long attemptLatency =
getAggregationValueAsLong(
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("UNKNOWN")));
assertThat(attemptLatency).isAtLeast(0);
}
}

@SuppressWarnings("unchecked")
private static <T> StreamObserver<T> anyObserver(Class<T> returnType) {
return (StreamObserver<T>) any(returnType);
Expand Down

0 comments on commit 8d71020

Please sign in to comment.