Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: add dataflow trace id support (#827)
* fix: add dataflow trace id support

* .

* add test
  • Loading branch information
yirutang committed Feb 12, 2021
1 parent 36322fb commit 8d22c58
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Expand Up @@ -91,6 +91,8 @@ public class StreamWriter implements AutoCloseable {
private final String streamName;
private final String tableName;

private final String traceId;

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
private BigQueryWriteSettings stubSettings;
Expand Down Expand Up @@ -151,6 +153,7 @@ private StreamWriter(Builder builder)
tableName = matcher.group(1);
}

this.traceId = builder.traceId;
this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
Expand Down Expand Up @@ -477,6 +480,11 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
"The first message on the connection must have writer schema set");
}
requestBuilder.setWriteStream(streamName);
if (!inflightRequests.get(0).message.getTraceId().isEmpty()) {
requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId());
} else if (streamWriter.traceId != null) {
requestBuilder.setTraceId(streamWriter.traceId);
}
}
return requestBuilder.setProtoRows(data.build()).build();
}
Expand Down Expand Up @@ -660,6 +668,8 @@ public static final class Builder {
private String streamOrTableName;
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private String traceId;

private BigQueryWriteClient client = null;

// Batching options
Expand Down Expand Up @@ -814,6 +824,12 @@ public Builder createDefaultStream() {
return this;
}

/** Mark the request as coming from Dataflow. */
public Builder setDataflowTraceId() {
this.traceId = "Dataflow";
return this;
}

/** Builds the {@code StreamWriter}. */
public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
return new StreamWriter(this);
Expand Down
Expand Up @@ -993,4 +993,26 @@ public void testFlushAllFailed() throws Exception {

writer.close();
}

@Test
public void testDatasetTraceId() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.build())
.setDataflowTraceId()
.build();
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
appendFuture1.get();
appendFuture2.get();
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
}
}

0 comments on commit 8d22c58

Please sign in to comment.