From 2e49ce8c79cb059840c3307898ba16980f6892fa Mon Sep 17 00:00:00 2001
From: yayi-google <75696801+yayi-google@users.noreply.github.com>
Date: Tue, 2 Mar 2021 09:59:47 -0800
Subject: [PATCH] feat: update StreamWriterV2 to support trace id (#895)
* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones.
* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones
* feat: StreamWriterV2 to support traceId
* Enforce traceId to be the format of A:B
---
.../storage/v1beta2/StreamWriterV2.java | 27 ++++++++++++-
.../storage/v1beta2/StreamWriterV2Test.java | 38 ++++++++++++++++++-
2 files changed, 61 insertions(+), 4 deletions(-)
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
index afd492ae6b..33abf53bf4 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
@@ -41,8 +41,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
- *
TODO: Attach traceId.
- *
*
TODO: Support batching.
*
*
TODO: Support schema change.
@@ -74,6 +72,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final long maxInflightBytes;
+ /*
+ * TraceId for debugging purpose.
+ */
+ private final String traceId;
+
/*
* Tracks current inflight requests in the stream.
*/
@@ -143,6 +146,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
+ this.traceId = builder.traceId;
this.waitingRequestQueue = new LinkedList();
this.inflightRequestQueue = new LinkedList();
if (builder.client == null) {
@@ -433,6 +437,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
}
requestBuilder.setWriteStream(this.streamName);
+ if (this.traceId != null) {
+ requestBuilder.setTraceId(this.traceId);
+ }
} else {
requestBuilder.clearWriteStream();
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
@@ -539,6 +546,8 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
+ private String traceId = null;
+
private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
@@ -591,6 +600,20 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
return this;
}
+ /**
+ * Sets traceId for debuging purpose. TraceId must follow the format of
+ * CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x.
+ */
+ public Builder setTraceId(String traceId) {
+ int colonIndex = traceId.indexOf(':');
+ if (colonIndex == -1 || colonIndex == 0 || colonIndex == traceId.length() - 1) {
+ throw new IllegalArgumentException(
+ "TraceId must follow the format of A:B. Actual:" + traceId);
+ }
+ this.traceId = traceId;
+ return this;
+ }
+
/** Builds the {@code StreamWriterV2}. */
public StreamWriterV2 build() throws IOException {
return new StreamWriterV2(this);
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
index a6b4a1e790..69aa4341a0 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
@@ -54,6 +54,7 @@
public class StreamWriterV2Test {
private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
+ private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
private static MockServiceHelper serviceHelper;
@@ -84,7 +85,7 @@ public void tearDown() throws Exception {
}
private StreamWriterV2 getTestStreamWriterV2() throws IOException {
- return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
+ return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
}
private ProtoSchema createProtoSchema() {
@@ -184,10 +185,12 @@ private void verifyAppendRequests(long appendCount) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
+ assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
+ assertEquals(serverRequest.getTraceId(), "");
}
}
}
@@ -209,7 +212,10 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
@Test
public void testAppendWithRowsSuccess() throws Exception {
StreamWriterV2 writer =
- StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build();
+ StreamWriterV2.newBuilder(TEST_STREAM, client)
+ .setWriterSchema(createProtoSchema())
+ .setTraceId(TEST_TRACE_ID)
+ .build();
long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
@@ -269,6 +275,34 @@ public void run() throws Throwable {
assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided"));
}
+ @Test
+ public void testInvalidTraceId() throws Exception {
+ assertThrows(
+ IllegalArgumentException.class,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc");
+ }
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc:");
+ }
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ StreamWriterV2.newBuilder(TEST_STREAM).setTraceId(":abc");
+ }
+ });
+ }
+
@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();