diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml
index 46b404bee..ea54e42a5 100644
--- a/google-cloud-bigtable/clirr-ignored-differences.xml
+++ b/google-cloud-bigtable/clirr-ignored-differences.xml
@@ -34,4 +34,9 @@
com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub
*
+
+
+ 8001
+ com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory
+
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index 46729134f..00981ab2e 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -70,11 +70,13 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
-import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
+import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
+import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
+import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
@@ -191,7 +193,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
- new CompositeTracerFactory(
+ new BigtableTracerFactory(
ImmutableList.of(
// Add OpenCensus Tracing
new OpencensusTracerFactory(
@@ -397,11 +399,14 @@ public Map extract(ReadRowsRequest readRowsRequest) {
.build(),
readRowsSettings.getRetryableCodes());
+ ServerStreamingCallable withStatsHeaders =
+ new StatsHeadersServerStreamingCallable<>(base);
+
// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable convertException =
- new ReadRowsConvertExceptionCallable<>(base);
+ new ReadRowsConvertExceptionCallable<>(withStatsHeaders);
ServerStreamingCallable merging =
new RowMergingCallable<>(convertException, rowAdapter);
@@ -468,9 +473,12 @@ public Map extract(
UnaryCallable> spoolable = base.all();
+ UnaryCallable> withStatsHeaders =
+ new StatsHeadersUnaryCallable<>(spoolable);
+
UnaryCallable> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
- spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());
+ withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());
UnaryCallable> retryable =
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
@@ -505,9 +513,12 @@ public Map extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());
+ UnaryCallable withStatsHeaders =
+ new StatsHeadersUnaryCallable<>(base);
+
UnaryCallable withHeaderTracer =
new HeaderTracerUnaryCallable<>(
- base, settings.getHeaderTracer(), getSpanName(methodName).toString());
+ withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());
UnaryCallable retrying =
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
@@ -646,6 +657,9 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());
+ ServerStreamingCallable withStatsHeaders =
+ new StatsHeadersServerStreamingCallable<>(base);
+
RetryAlgorithm retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm(),
@@ -656,7 +670,7 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
- base,
+ withStatsHeaders,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
@@ -689,9 +703,12 @@ public Map extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());
+ UnaryCallable withStatsHeaders =
+ new StatsHeadersUnaryCallable<>(base);
+
UnaryCallable withHeaderTracer =
new HeaderTracerUnaryCallable<>(
- base, settings.getHeaderTracer(), getSpanName(methodName).toString());
+ withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());
UnaryCallable retrying =
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
@@ -726,10 +743,14 @@ public Map extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());
+
+ UnaryCallable withStatsHeaders =
+ new StatsHeadersUnaryCallable<>(base);
+
String methodName = "ReadModifyWriteRow";
UnaryCallable withHeaderTracer =
new HeaderTracerUnaryCallable<>(
- base, settings.getHeaderTracer(), getSpanName(methodName).toString());
+ withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());
UnaryCallable retrying =
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
similarity index 80%
rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
index 25893ee88..225ba182c 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
@@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.common.collect.ImmutableList;
@@ -22,11 +23,17 @@
import java.util.List;
import org.threeten.bp.Duration;
-/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */
-class CompositeTracer extends BaseApiTracer {
+/**
+ * A Bigtable specific {@link ApiTracer} that will be used to plumb additional context through the
+ * call chains as well as combines multiple user defined {@link ApiTracer}s into a single one. This
+ * will ensure that operation lifecycle events are plumbed through while maintaining user configured
+ * functionalities.
+ */
+class BigtableTracer extends BaseApiTracer {
private final List children;
+ private volatile int attempt = 0;
- CompositeTracer(List children) {
+ BigtableTracer(List children) {
this.children = ImmutableList.copyOf(children);
}
@@ -78,6 +85,7 @@ public void connectionSelected(String id) {
@Override
public void attemptStarted(int attemptNumber) {
+ this.attempt = attemptNumber;
for (ApiTracer child : children) {
child.attemptStarted(attemptNumber);
}
@@ -152,4 +160,13 @@ public void batchRequestSent(long elementCount, long requestSize) {
child.batchRequestSent(elementCount, requestSize);
}
}
+
+ /**
+ * Get the attempt number of the current call. Attempt number for the current call is passed in
+ * and recorded in {@link #attemptStarted(int)}. With the getter we can access it from {@link
+ * ApiCallContext}. Attempt number starts from 0.
+ */
+ public int getAttempt() {
+ return attempt;
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerFactory.java
similarity index 82%
rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java
rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerFactory.java
index 2d9256a5e..f980c4b7c 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerFactory.java
@@ -24,12 +24,15 @@
import java.util.ArrayList;
import java.util.List;
-/** Combines multiple {@link ApiTracerFactory} into a single {@link ApiTracerFactory}. */
+/**
+ * A Bigtable specific {@link ApiTracerFactory} that combines multiple {@link ApiTracerFactory} into
+ * a single one.
+ */
@InternalApi("For internal use only")
-public class CompositeTracerFactory extends BaseApiTracerFactory {
+public class BigtableTracerFactory extends BaseApiTracerFactory {
private final List apiTracerFactories;
- public CompositeTracerFactory(List apiTracerFactories) {
+ public BigtableTracerFactory(List apiTracerFactories) {
this.apiTracerFactories = ImmutableList.copyOf(apiTracerFactories);
}
@@ -40,6 +43,6 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
for (ApiTracerFactory factory : apiTracerFactories) {
children.add(factory.newTracer(parent, spanName, operationType));
}
- return new CompositeTracer(children);
+ return new BigtableTracer(children);
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersServerStreamingCallable.java
new file mode 100644
index 000000000..edc794c23
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersServerStreamingCallable.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+
+/**
+ * A callable that injects client timestamp and current attempt number to request headers. Attempt
+ * number starts from 0.
+ */
+@InternalApi("For internal use only")
+public final class StatsHeadersServerStreamingCallable
+ extends ServerStreamingCallable {
+ private final ServerStreamingCallable innerCallable;
+
+ public StatsHeadersServerStreamingCallable(ServerStreamingCallable innerCallable) {
+ this.innerCallable = innerCallable;
+ }
+
+ @Override
+ public void call(
+ RequestT request,
+ ResponseObserver responseObserver,
+ ApiCallContext apiCallContext) {
+ ApiCallContext newCallContext =
+ apiCallContext.withExtraHeaders(Util.createStatsHeaders(apiCallContext));
+ innerCallable.call(request, responseObserver, newCallContext);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersUnaryCallable.java
new file mode 100644
index 000000000..1e7b67a6f
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersUnaryCallable.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.UnaryCallable;
+
+/**
+ * A callable that injects client timestamp and current attempt number to request headers. Attempt
+ * number starts from 0.
+ */
+@InternalApi("For internal use only")
+public final class StatsHeadersUnaryCallable
+ extends UnaryCallable {
+ private final UnaryCallable innerCallable;
+
+ public StatsHeadersUnaryCallable(UnaryCallable innerCallable) {
+ this.innerCallable = innerCallable;
+ }
+
+ @Override
+ public ApiFuture futureCall(RequestT request, ApiCallContext apiCallContext) {
+ ApiCallContext newCallContext =
+ apiCallContext.withExtraHeaders(Util.createStatsHeaders(apiCallContext));
+ return innerCallable.futureCall(request, newCallContext);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
index ff40aca38..c9e69c067 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
@@ -15,13 +15,21 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.tags.TagValue;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -29,6 +37,11 @@
/** Utilities to help integrating with OpenCensus. */
class Util {
+ static final Metadata.Key ATTEMPT_HEADER_KEY =
+ Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER);
+ static final Metadata.Key ATTEMPT_EPOCH_KEY =
+ Metadata.Key.of("bigtable-client-attempt-epoch-usec", Metadata.ASCII_STRING_MARSHALLER);
+
private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString());
/** Convert an exception into a value that can be used as an OpenCensus tag value. */
@@ -71,4 +84,21 @@ static TagValue extractStatus(Future> future) {
}
return extractStatus(error);
}
+
+ /**
+ * Add attempt number and client timestamp from api call context to request headers. Attempt
+ * number starts from 0.
+ */
+ static Map> createStatsHeaders(ApiCallContext apiCallContext) {
+ ImmutableMap.Builder> headers = ImmutableMap.builder();
+ headers.put(
+ ATTEMPT_EPOCH_KEY.name(),
+ Arrays.asList(String.valueOf(Instant.EPOCH.until(Instant.now(), ChronoUnit.MICROS))));
+ // This should always be true
+ if (apiCallContext.getTracer() instanceof BigtableTracer) {
+ int attemptCount = ((BigtableTracer) apiCallContext.getTracer()).getAttempt();
+ headers.put(ATTEMPT_HEADER_KEY.name(), Arrays.asList(String.valueOf(attemptCount)));
+ }
+ return headers.build();
+ }
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerTest.java
similarity index 83%
rename from google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
rename to google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerTest.java
index cedb227ba..f1b464ff4 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerTest.java
@@ -34,17 +34,17 @@
import org.threeten.bp.Duration;
@RunWith(JUnit4.class)
-public class CompositeTracerTest {
+public class BigtableTracerTest {
@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
@Mock private ApiTracer child1;
@Mock private ApiTracer child2;
- private CompositeTracer compositeTracer;
+ private BigtableTracer bigtableTracer;
@Before
public void setup() {
- compositeTracer = new CompositeTracer(ImmutableList.of(child1, child2));
+ bigtableTracer = new BigtableTracer(ImmutableList.of(child1, child2));
}
@Test
@@ -55,7 +55,7 @@ public void testInScope() {
Scope scope2 = mock(Scope.class);
when(child2.inScope()).thenReturn(scope2);
- Scope parentScope = compositeTracer.inScope();
+ Scope parentScope = bigtableTracer.inScope();
parentScope.close();
verify(scope1, times(1)).close();
@@ -63,14 +63,14 @@ public void testInScope() {
@Test
public void testOperationSucceeded() {
- compositeTracer.operationSucceeded();
+ bigtableTracer.operationSucceeded();
verify(child1, times(1)).operationSucceeded();
verify(child2, times(1)).operationSucceeded();
}
@Test
public void testOperationCancelled() {
- compositeTracer.operationCancelled();
+ bigtableTracer.operationCancelled();
verify(child1, times(1)).operationCancelled();
verify(child2, times(1)).operationCancelled();
}
@@ -78,35 +78,35 @@ public void testOperationCancelled() {
@Test
public void testOperationFailed() {
RuntimeException error = new RuntimeException();
- compositeTracer.operationFailed(error);
+ bigtableTracer.operationFailed(error);
verify(child1, times(1)).operationFailed(error);
verify(child2, times(1)).operationFailed(error);
}
@Test
public void testConnectionSelected() {
- compositeTracer.connectionSelected("connection-one");
+ bigtableTracer.connectionSelected("connection-one");
verify(child1, times(1)).connectionSelected("connection-one");
verify(child2, times(1)).connectionSelected("connection-one");
}
@Test
public void testAttemptStarted() {
- compositeTracer.attemptStarted(3);
+ bigtableTracer.attemptStarted(3);
verify(child1, times(1)).attemptStarted(3);
verify(child2, times(1)).attemptStarted(3);
}
@Test
public void testAttemptSucceeded() {
- compositeTracer.attemptSucceeded();
+ bigtableTracer.attemptSucceeded();
verify(child1, times(1)).attemptSucceeded();
verify(child2, times(1)).attemptSucceeded();
}
@Test
public void testAttemptCancelled() {
- compositeTracer.attemptCancelled();
+ bigtableTracer.attemptCancelled();
verify(child1, times(1)).attemptCancelled();
verify(child2, times(1)).attemptCancelled();
}
@@ -115,7 +115,7 @@ public void testAttemptCancelled() {
public void testAttemptFailed() {
RuntimeException error = new RuntimeException();
Duration delay = Duration.ofMillis(10);
- compositeTracer.attemptFailed(error, delay);
+ bigtableTracer.attemptFailed(error, delay);
verify(child1, times(1)).attemptFailed(error, delay);
verify(child2, times(1)).attemptFailed(error, delay);
}
@@ -123,7 +123,7 @@ public void testAttemptFailed() {
@Test
public void testAttemptFailedRetriesExhausted() {
RuntimeException error = new RuntimeException();
- compositeTracer.attemptFailedRetriesExhausted(error);
+ bigtableTracer.attemptFailedRetriesExhausted(error);
verify(child1, times(1)).attemptFailedRetriesExhausted(error);
verify(child2, times(1)).attemptFailedRetriesExhausted(error);
}
@@ -131,7 +131,7 @@ public void testAttemptFailedRetriesExhausted() {
@Test
public void testAttemptPermanentFailure() {
RuntimeException error = new RuntimeException();
- compositeTracer.attemptPermanentFailure(error);
+ bigtableTracer.attemptPermanentFailure(error);
verify(child1, times(1)).attemptPermanentFailure(error);
verify(child2, times(1)).attemptPermanentFailure(error);
}
@@ -139,35 +139,35 @@ public void testAttemptPermanentFailure() {
@Test
public void testLroStartFailed() {
RuntimeException error = new RuntimeException();
- compositeTracer.lroStartFailed(error);
+ bigtableTracer.lroStartFailed(error);
verify(child1, times(1)).lroStartFailed(error);
verify(child2, times(1)).lroStartFailed(error);
}
@Test
public void testLroStartSucceeded() {
- compositeTracer.lroStartSucceeded();
+ bigtableTracer.lroStartSucceeded();
verify(child1, times(1)).lroStartSucceeded();
verify(child2, times(1)).lroStartSucceeded();
}
@Test
public void testResponseReceived() {
- compositeTracer.responseReceived();
+ bigtableTracer.responseReceived();
verify(child1, times(1)).responseReceived();
verify(child2, times(1)).responseReceived();
}
@Test
public void testRequestSent() {
- compositeTracer.requestSent();
+ bigtableTracer.requestSent();
verify(child1, times(1)).requestSent();
verify(child2, times(1)).requestSent();
}
@Test
public void testBatchRequestSent() {
- compositeTracer.batchRequestSent(2, 20);
+ bigtableTracer.batchRequestSent(2, 20);
verify(child1, times(1)).batchRequestSent(2, 20);
verify(child2, times(1)).batchRequestSent(2, 20);
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java
new file mode 100644
index 000000000..c59a84828
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceHelper;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.common.collect.Queues;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.StringValue;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StatsHeadersCallableTest {
+ private FakeServiceHelper serviceHelper;
+
+ private FakeService fakeService = new FakeService();
+
+ private EnhancedBigtableStub stub;
+
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+
+ private final int attemptCounts = 3;
+ private MetadataInterceptor metadataInterceptor;
+
+ @Before
+ public void setUp() throws Exception {
+ metadataInterceptor = new MetadataInterceptor();
+ serviceHelper = new FakeServiceHelper(metadataInterceptor, fakeService);
+ serviceHelper.start();
+
+ EnhancedBigtableStubSettings settings =
+ BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build()
+ .getStubSettings();
+ stub = EnhancedBigtableStub.create(settings);
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ serviceHelper.shutdown();
+ }
+
+ @Test
+ public void testReadRowsHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.readRowsCallable().call(Query.create(TABLE_ID).rowKey("key")).iterator().next();
+ verifyHeaders(attemptCounts, startTimestamp);
+ }
+
+ @Test
+ public void testReadRowHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.readRowCallable().futureCall(Query.create(TABLE_ID).rowKey("key")).get();
+ verifyHeaders(attemptCounts, startTimestamp);
+ }
+
+ @Test
+ public void testMutateRowsHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.bulkMutateRowsCallable()
+ .futureCall(BulkMutation.create(TABLE_ID).add(RowMutationEntry.create("key")))
+ .get();
+ verifyHeaders(attemptCounts, startTimestamp);
+ }
+
+ @Test
+ public void testMutateRowHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.mutateRowCallable()
+ .futureCall(RowMutation.create(TABLE_ID, "key").setCell("f", "q", "value"))
+ .get();
+ verifyHeaders(attemptCounts, startTimestamp);
+ }
+
+ @Test
+ public void testSampleRowKeysHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.sampleRowKeysCallable().call(TABLE_ID).get(0);
+ verifyHeaders(attemptCounts, startTimestamp);
+ }
+
+ @Test
+ public void testCheckAndMutateHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.checkAndMutateRowCallable()
+ .call(
+ ConditionalRowMutation.create(TABLE_ID, "key")
+ .then(Mutation.create().setCell("f", "q", "value")));
+ verifyHeaders(1, startTimestamp);
+ }
+
+ @Test
+ public void testReadModifyWriteHeaders() throws Exception {
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.readModifyWriteRowCallable()
+ .call(ReadModifyWriteRow.create(TABLE_ID, "key").append("f", "q", "value"));
+ verifyHeaders(1, startTimestamp);
+ }
+
+ @Test
+ public void testMultipleRequests() throws Exception {
+ // Send multiple requests and make sure headers are set correctly
+ long startTimestamp = System.currentTimeMillis() * 1000;
+ stub.readRowsCallable().call(Query.create(TABLE_ID).rowKey("key")).iterator().next();
+ verifyHeaders(attemptCounts, startTimestamp);
+
+ startTimestamp = System.currentTimeMillis() * 1000;
+ stub.readRowsCallable().call(Query.create(TABLE_ID).rowKey("key")).iterator().next();
+ verifyHeaders(1, startTimestamp);
+ }
+
+ private static class MetadataInterceptor implements ServerInterceptor {
+ final BlockingQueue headers = Queues.newLinkedBlockingDeque();
+
+ @Override
+ public Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ headers.add(metadata);
+ return serverCallHandler.startCall(serverCall, metadata);
+ }
+ }
+
+ private class FakeService extends BigtableImplBase {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ @Override
+ public void readRows(ReadRowsRequest request, StreamObserver observer) {
+ if (callCount.get() < attemptCounts - 1) {
+ callCount.incrementAndGet();
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return;
+ }
+ observer.onNext(
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setCommitRow(true)
+ .setRowKey(ByteString.copyFromUtf8("a"))
+ .setFamilyName(StringValue.getDefaultInstance())
+ .setQualifier(BytesValue.getDefaultInstance())
+ .setValueSize(0))
+ .build());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void mutateRow(MutateRowRequest request, StreamObserver observer) {
+ if (callCount.get() < attemptCounts - 1) {
+ callCount.incrementAndGet();
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return;
+ }
+ observer.onNext(MutateRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void mutateRows(MutateRowsRequest request, StreamObserver observer) {
+ if (callCount.get() < attemptCounts - 1) {
+ callCount.incrementAndGet();
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return;
+ }
+ observer.onNext(MutateRowsResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void sampleRowKeys(
+ SampleRowKeysRequest request, StreamObserver observer) {
+ if (callCount.get() < attemptCounts - 1) {
+ callCount.incrementAndGet();
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return;
+ }
+ observer.onNext(SampleRowKeysResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void checkAndMutateRow(
+ CheckAndMutateRowRequest request, StreamObserver observer) {
+ observer.onNext(CheckAndMutateRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void readModifyWriteRow(
+ ReadModifyWriteRowRequest request, StreamObserver observer) {
+ observer.onNext(ReadModifyWriteRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+ }
+
+ private void verifyHeaders(int expectedAttemptCounts, long startTimestamp) throws Exception {
+ assertThat(metadataInterceptor.headers).hasSize(expectedAttemptCounts);
+ long timestamp = startTimestamp;
+
+ for (int i = 0; i < expectedAttemptCounts; i++) {
+ Metadata headers = metadataInterceptor.headers.take();
+ String attemptCount = headers.get(Util.ATTEMPT_HEADER_KEY);
+ assertThat(attemptCount).isNotNull();
+ assertThat(Integer.valueOf(attemptCount)).isEqualTo(i);
+
+ String clientTimeStr = headers.get(Util.ATTEMPT_EPOCH_KEY);
+ assertThat(clientTimeStr).isNotNull();
+ long clientTime = Long.valueOf(clientTimeStr);
+ assertThat(clientTime).isAtLeast(timestamp);
+
+ timestamp = clientTime;
+ }
+ }
+}
diff --git a/proto-google-cloud-bigtable-admin-v2/clirr-ignored-differences.xml b/proto-google-cloud-bigtable-admin-v2/clirr-ignored-differences.xml
index 516cb787c..696c323a9 100644
--- a/proto-google-cloud-bigtable-admin-v2/clirr-ignored-differences.xml
+++ b/proto-google-cloud-bigtable-admin-v2/clirr-ignored-differences.xml
@@ -21,14 +21,4 @@
com/google/bigtable/admin/v2/*OrBuilder
boolean has*(*)
-
-
- 8001
- com/google/bigtable/admin/v2/CryptoKeyName*
-
-
-
- 8001
- com/google/bigtable/admin/v2/CryptoKeyVersionName*
-
\ No newline at end of file