Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add dataflow trace id support #827

Merged
merged 3 commits into from Feb 12, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -91,6 +91,8 @@ public class StreamWriter implements AutoCloseable {
private final String streamName;
private final String tableName;

private final String traceId;

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
private BigQueryWriteSettings stubSettings;
Expand Down Expand Up @@ -151,6 +153,7 @@ private StreamWriter(Builder builder)
tableName = matcher.group(1);
}

this.traceId = builder.traceId;
this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
Expand Down Expand Up @@ -477,6 +480,11 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
"The first message on the connection must have writer schema set");
}
requestBuilder.setWriteStream(streamName);
if (!inflightRequests.get(0).message.getTraceId().isEmpty()) {
requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId());
} else {
requestBuilder.setTraceId(streamWriter.traceId);
}
}
return requestBuilder.setProtoRows(data.build()).build();
}
Expand Down Expand Up @@ -660,6 +668,8 @@ public static final class Builder {
private String streamOrTableName;
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private String traceId;

private BigQueryWriteClient client = null;

// Batching options
Expand Down Expand Up @@ -814,6 +824,12 @@ public Builder createDefaultStream() {
return this;
}

/** Mark the request as coming from Dataflow. */
public Builder setDataflowTraceId() {
this.traceId = "Dataflow";
return this;
}

/** Builds the {@code StreamWriter}. */
public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
return new StreamWriter(this);
Expand Down