Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
igorbernstein2 committed Mar 31, 2020
1 parent a8d5451 commit cd9d8a5
Show file tree
Hide file tree
Showing 19 changed files with 1,201 additions and 968 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigtable/pom.xml
Expand Up @@ -218,6 +218,11 @@
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Expand Up @@ -72,6 +72,10 @@ private void writeObject(ObjectOutputStream output) throws IOException {
builder.build().writeTo(output);
}

public String getTableId() {
return tableId;
}

/** Adds a key to looked up */
public Query rowKey(String key) {
Preconditions.checkNotNull(key, "Key can't be null.");
Expand Down
Expand Up @@ -19,6 +19,8 @@
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
Expand All @@ -31,6 +33,8 @@
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
Expand Down Expand Up @@ -58,9 +62,9 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredMutateRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredReadRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand All @@ -72,10 +76,13 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import java.io.IOException;
Expand Down Expand Up @@ -103,10 +110,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final ClientContext clientContext;
private final RequestContext requestContext;

// TODO: This should probably move to ClientContext
private final Tagger tagger;
private final StatsRecorder statsRecorder;

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
Expand All @@ -117,22 +120,57 @@ public class EnhancedBigtableStub implements AutoCloseable {

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
ClientContext clientContext = ClientContext.create(settings);

return new EnhancedBigtableStub(
settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder());
settings, ClientContext.create(settings), Tags.getTagger(), Stats.getStatsRecorder());
}

@InternalApi("Visible for testing")
private EnhancedBigtableStub(
public EnhancedBigtableStub(
EnhancedBigtableStubSettings settings,
ClientContext clientContext,
Tagger tagger,
StatsRecorder statsRecorder) {
this.settings = settings;
this.clientContext = clientContext;
this.tagger = tagger;
this.statsRecorder = statsRecorder;
this.clientContext =
clientContext
.toBuilder()
.setTracerFactory(
new CompositeTracerFactory(
ImmutableList.<ApiTracerFactory>of(
// Ad OpenCensus Tracing
new OpencensusTracerFactory(
ImmutableMap.<String, String>builder()
// TODO: use constants to make sure that these align with OpenCensus
// Metric tags
.put("bigtable_project_id", settings.getProjectId())
.put("bigtable_instance_id", settings.getInstanceId())
.put("bigtable_app_profile_id", settings.getAppProfileId())
.put("gax", GaxGrpcProperties.getGaxGrpcVersion())
.put("grpc", GaxGrpcProperties.getGrpcVersion())
.put(
"gapic",
GaxProperties.getLibraryVersion(
EnhancedBigtableStubSettings.class))
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(
tagger,
statsRecorder,
ImmutableMap.<TagKey, TagValue>builder()
.put(
RpcMeasureConstants.BIGTABLE_PROJECT_ID,
TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build()),
// Add user configured tracer
clientContext.getTracerFactory())))
.build();
this.requestContext =
RequestContext.create(
settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId());
Expand Down Expand Up @@ -164,24 +202,16 @@ private EnhancedBigtableStub(
*/
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<Query, RowT> readRowsCallable =
final ServerStreamingCallable<Query, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> traced =
final ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsCallable,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));

ServerStreamingCallable<Query, RowT> measured =
new MeasuredReadRowsCallable<>(
traced,
TRACING_OUTER_CLIENT_NAME + ".ReadRows",
tagger,
statsRecorder,
clientContext.getClock());

return measured.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -368,15 +398,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "MutateRows"));

UnaryCallable<BulkMutation, Void> measured =
new MeasuredMutateRowsCallable(
traced,
TRACING_OUTER_CLIENT_NAME + ".MutateRows",
tagger,
statsRecorder,
clientContext.getClock());

return measured.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -548,15 +570,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName));

UnaryCallable<RequestT, ResponseT> measured =
new MeasuredUnaryCallable<>(
traced,
TRACING_OUTER_CLIENT_NAME + "." + methodName,
tagger,
statsRecorder,
clientContext.getClock());

return measured.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

Expand Down
Expand Up @@ -20,17 +20,14 @@
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.RefreshChannel;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand All @@ -42,7 +39,6 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -520,13 +516,6 @@ private Builder() {
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

setTracerFactory(
new OpencensusTracerFactory(
ImmutableMap.of(
"gax", GaxGrpcProperties.getGaxGrpcVersion(),
"grpc", GaxGrpcProperties.getGrpcVersion(),
"gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))));

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();

Expand Down
@@ -0,0 +1,154 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.ApiTracer;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.threeten.bp.Duration;

/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */
class CompositeTracer implements ApiTracer {
private final List<ApiTracer> children;

public CompositeTracer(List<ApiTracer> children) {
this.children = ImmutableList.copyOf(children);
}

@Override
public Scope inScope() {
final List<Scope> childScopes = new ArrayList<>(children.size());

for (ApiTracer child : children) {
childScopes.add(child.inScope());
}

return new Scope() {
@Override
public void close() {
for (Scope childScope : childScopes) {
childScope.close();
}
}
};
}

@Override
public void operationSucceeded() {
for (ApiTracer child : children) {
child.operationSucceeded();
}
}

@Override
public void operationCancelled() {
for (ApiTracer child : children) {
child.operationCancelled();
}
}

@Override
public void operationFailed(Throwable error) {
for (ApiTracer child : children) {
child.operationFailed(error);
}
}

@Override
public void connectionSelected(String id) {
for (ApiTracer child : children) {
child.connectionSelected(id);
}
}

@Override
public void attemptStarted(int attemptNumber) {
for (ApiTracer child : children) {
child.attemptStarted(attemptNumber);
}
}

@Override
public void attemptSucceeded() {
for (ApiTracer child : children) {
child.attemptSucceeded();
}
}

@Override
public void attemptCancelled() {
for (ApiTracer child : children) {
child.attemptCancelled();
}
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
for (ApiTracer child : children) {
child.attemptFailed(error, delay);
}
}

@Override
public void attemptFailedRetriesExhausted(Throwable error) {
for (ApiTracer child : children) {
child.attemptFailedRetriesExhausted(error);
}
}

@Override
public void attemptPermanentFailure(Throwable error) {
for (ApiTracer child : children) {
child.attemptPermanentFailure(error);
}
}

@Override
public void lroStartFailed(Throwable error) {
for (ApiTracer child : children) {
child.lroStartFailed(error);
}
}

@Override
public void lroStartSucceeded() {
for (ApiTracer child : children) {
child.lroStartSucceeded();
}
}

@Override
public void responseReceived() {
for (ApiTracer child : children) {
child.responseReceived();
}
}

@Override
public void requestSent() {
for (ApiTracer child : children) {
child.requestSent();
}
}

@Override
public void batchRequestSent(long elementCount, long requestSize) {
for (ApiTracer child : children) {
child.batchRequestSent(elementCount, requestSize);
}
}
}

0 comments on commit cd9d8a5

Please sign in to comment.