diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 64b11321de..ad6b6ddba5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -91,6 +91,8 @@ public class StreamWriter implements AutoCloseable { private final String streamName; private final String tableName; + private final String traceId; + private final BatchingSettings batchingSettings; private final RetrySettings retrySettings; private BigQueryWriteSettings stubSettings; @@ -151,6 +153,7 @@ private StreamWriter(Builder builder) tableName = matcher.group(1); } + this.traceId = builder.traceId; this.batchingSettings = builder.batchingSettings; this.retrySettings = builder.retrySettings; this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this); @@ -477,6 +480,11 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException { "The first message on the connection must have writer schema set"); } requestBuilder.setWriteStream(streamName); + if (!inflightRequests.get(0).message.getTraceId().isEmpty()) { + requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId()); + } else if (streamWriter.traceId != null) { + requestBuilder.setTraceId(streamWriter.traceId); + } } return requestBuilder.setProtoRows(data.build()).build(); } @@ -660,6 +668,8 @@ public static final class Builder { private String streamOrTableName; private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + private String traceId; + private BigQueryWriteClient client = null; // Batching options @@ -814,6 +824,12 @@ public Builder createDefaultStream() { return this; } + /** Mark the request as coming from Dataflow. */ + public Builder setDataflowTraceId() { + this.traceId = "Dataflow"; + return this; + } + /** Builds the {@code StreamWriter}. */ public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException { return new StreamWriter(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 7cc101de19..169c815e91 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -993,4 +993,26 @@ public void testFlushAllFailed() throws Exception { writer.close(); } + + @Test + public void testDatasetTraceId() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .setDataflowTraceId() + .build(); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + appendFuture1.get(); + appendFuture2.get(); + assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId()); + assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId()); + } }