New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a batch write flow control example for Bigtable #9314
base: main
Are you sure you want to change the base?
Add a batch write flow control example for Bigtable #9314
Conversation
411e662
to
6e4f7db
Compare
@@ -0,0 +1,127 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> | |||
<!-- | |||
Copyright 2021 Google LLC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you change to 2024
|
||
PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows)); | ||
|
||
if (options.getUseCloudBigtableIo()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the code showing both ways for this? I know we recommend customers use one of these over the other but can never remember which one. Ideally we just show the recommended way. Or if the goal is to just show how to activate the flow control on the client then we could keep both but maybe the data being written can be simplified to not have as much information for customers to take in.
For example, we could just write one row using each client and that can reduce the need to generate sequences and all the additional helper functions.
6e4f7db
to
d563567
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello,
Please address the following questions:
- We ask to have a single code sample per file. This code looks like showing two code samples. What is this code sample demonstrates?
- This code sample does not have regional tags. What documentation use it?
- We do not host code samples without tests. What is a reason for lack of tests?
Pipeline p = Pipeline.create(options); | ||
|
||
PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows)); | ||
|
||
if (options.getUseCloudBigtableIo()) { | ||
System.out.println("Using CloudBigtableIO"); | ||
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()), | ||
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, | ||
"true") | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES, | ||
"1048576") | ||
.build())); | ||
} else { | ||
System.out.println("Using BigtableIO"); | ||
PCollection<KV<ByteString, Iterable<Mutation>>> | ||
mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
BigtableIO.Write write = BigtableIO.write() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withFlowControl(true); // This enables batch write flow control | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()), | ||
write | ||
); | ||
} | ||
|
||
p.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this block of code is hard to read. Since it is a code sample it should be easy to understand. Please, reformat it so it will look like steps each of the steps calling apply
method of the pipeline. See the dataflow-bigquery-read-tablerows sample as a reference.
Pipeline p = Pipeline.create(options); | ||
|
||
PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows)); | ||
|
||
if (options.getUseCloudBigtableIo()) { | ||
System.out.println("Using CloudBigtableIO"); | ||
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()), | ||
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, | ||
"true") | ||
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES, | ||
"1048576") | ||
.build())); | ||
} else { | ||
System.out.println("Using BigtableIO"); | ||
PCollection<KV<ByteString, Iterable<Mutation>>> | ||
mutations = numbers.apply(mutationLabel, | ||
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(), | ||
options.getBigtableBytesPerCol()))); | ||
|
||
BigtableIO.Write write = BigtableIO.write() | ||
.withProjectId(options.getProject()) | ||
.withInstanceId(options.getBigtableInstanceId()) | ||
.withTableId(options.getBigtableTableId()) | ||
.withFlowControl(true); // This enables batch write flow control | ||
|
||
mutations.apply( | ||
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()), | ||
write | ||
); | ||
} | ||
|
||
p.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it intend for run asynchronously? Please, append waitUntilFinish()
call to the result of the run()
.
Description
Add a batch write flow control example for Bigtable
Checklist
pom.xml
parent set to latestshared-configuration
mvn clean verify
requiredmvn -P lint checkstyle:check
requiredmvn -P lint clean compile pmd:cpd-check spotbugs:check
advisory only