diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index afd492ae6b..33abf53bf4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -41,8 +41,6 @@ /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * - *

TODO: Attach traceId. - * *

TODO: Support batching. * *

TODO: Support schema change. @@ -74,6 +72,11 @@ public class StreamWriterV2 implements AutoCloseable { */ private final long maxInflightBytes; + /* + * TraceId for debugging purpose. + */ + private final String traceId; + /* * Tracks current inflight requests in the stream. */ @@ -143,6 +146,7 @@ private StreamWriterV2(Builder builder) throws IOException { this.writerSchema = builder.writerSchema; this.maxInflightRequests = builder.maxInflightRequest; this.maxInflightBytes = builder.maxInflightBytes; + this.traceId = builder.traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); if (builder.client == null) { @@ -433,6 +437,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition( requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema); } requestBuilder.setWriteStream(this.streamName); + if (this.traceId != null) { + requestBuilder.setTraceId(this.traceId); + } } else { requestBuilder.clearWriteStream(); requestBuilder.getProtoRowsBuilder().clearWriterSchema(); @@ -539,6 +546,8 @@ public static final class Builder { private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private String traceId = null; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -591,6 +600,20 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { return this; } + /** + * Sets traceId for debuging purpose. TraceId must follow the format of + * CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x. + */ + public Builder setTraceId(String traceId) { + int colonIndex = traceId.indexOf(':'); + if (colonIndex == -1 || colonIndex == 0 || colonIndex == traceId.length() - 1) { + throw new IllegalArgumentException( + "TraceId must follow the format of A:B. Actual:" + traceId); + } + this.traceId = traceId; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriterV2 build() throws IOException { return new StreamWriterV2(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index a6b4a1e790..69aa4341a0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -54,6 +54,7 @@ public class StreamWriterV2Test { private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName()); private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; private static MockServiceHelper serviceHelper; @@ -84,7 +85,7 @@ public void tearDown() throws Exception { } private StreamWriterV2 getTestStreamWriterV2() throws IOException { - return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); + return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build(); } private ProtoSchema createProtoSchema() { @@ -184,10 +185,12 @@ private void verifyAppendRequests(long appendCount) { // First request received by server should have schema and stream name. assertTrue(serverRequest.getProtoRows().hasWriterSchema()); assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID); } else { // Following request should not have schema and stream name. assertFalse(serverRequest.getProtoRows().hasWriterSchema()); assertEquals(serverRequest.getWriteStream(), ""); + assertEquals(serverRequest.getTraceId(), ""); } } } @@ -209,7 +212,10 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception { @Test public void testAppendWithRowsSuccess() throws Exception { StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build(); + StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .build(); long appendCount = 100; for (int i = 0; i < appendCount; i++) { @@ -269,6 +275,34 @@ public void run() throws Throwable { assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided")); } + @Test + public void testInvalidTraceId() throws Exception { + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc"); + } + }); + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc:"); + } + }); + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriterV2.newBuilder(TEST_STREAM).setTraceId(":abc"); + } + }); + } + @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2();