Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a batch write flow control example for Bigtable #9314

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kongweihan
Copy link
Contributor

Description

Add a batch write flow control example for Bigtable

Checklist

  • I have followed Sample Format Guide
  • pom.xml parent set to latest shared-configuration
  • Appropriate changes to README are included in PR
  • [] These samples need a new API enabled in testing projects to pass (let us know which ones)
  • [] These samples need a new/updated env vars in testing projects set to pass (let us know which ones)
  • Tests pass: mvn clean verify required
  • Lint passes: mvn -P lint checkstyle:check required
  • Static Analysis: mvn -P lint clean compile pmd:cpd-check spotbugs:check advisory only
  • [] This sample adds a new sample directory, and I updated the CODEOWNERS file with the codeowners for this sample
  • [] This sample adds a new Product API, and I updated the Blunderbuss issue/PR auto-assigner with the codeowners for this sample
  • Please merge this PR for me once it is approved

@kongweihan kongweihan requested review from yoshi-approver and a team as code owners May 8, 2024 16:20
@product-auto-label product-auto-label bot added samples Issues that are directly related to samples. api: bigtable Issues related to the Bigtable API. labels May 8, 2024
@kongweihan kongweihan force-pushed the flow-control-example branch 5 times, most recently from 411e662 to 6e4f7db Compare May 8, 2024 17:35
@@ -0,0 +1,127 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2021 Google LLC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change to 2024


PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows));

if (options.getUseCloudBigtableIo()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the code showing both ways for this? I know we recommend customers use one of these over the other but can never remember which one. Ideally we just show the recommended way. Or if the goal is to just show how to activate the flow control on the client then we could keep both but maybe the data being written can be simplified to not have as much information for customers to take in.

For example, we could just write one row using each client and that can reduce the need to generate sequences and all the additional helper functions.

Copy link
Contributor

@minherz minherz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello,
Please address the following questions:

  1. We ask to have a single code sample per file. This code looks like showing two code samples. What is this code sample demonstrates?
  2. This code sample does not have regional tags. What documentation use it?
  3. We do not host code samples without tests. What is a reason for lack of tests?

Comment on lines +86 to +126
Pipeline p = Pipeline.create(options);

PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows));

if (options.getUseCloudBigtableIo()) {
System.out.println("Using CloudBigtableIO");
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

mutations.apply(
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()),
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
"true")
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES,
"1048576")
.build()));
} else {
System.out.println("Using BigtableIO");
PCollection<KV<ByteString, Iterable<Mutation>>>
mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

BigtableIO.Write write = BigtableIO.write()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withFlowControl(true); // This enables batch write flow control

mutations.apply(
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()),
write
);
}

p.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block of code is hard to read. Since it is a code sample it should be easy to understand. Please, reformat it so it will look like steps each of the steps calling apply method of the pipeline. See the dataflow-bigquery-read-tablerows sample as a reference.

Comment on lines +86 to +126
Pipeline p = Pipeline.create(options);

PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows));

if (options.getUseCloudBigtableIo()) {
System.out.println("Using CloudBigtableIO");
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

mutations.apply(
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()),
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
"true")
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES,
"1048576")
.build()));
} else {
System.out.println("Using BigtableIO");
PCollection<KV<ByteString, Iterable<Mutation>>>
mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

BigtableIO.Write write = BigtableIO.write()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withFlowControl(true); // This enables batch write flow control

mutations.apply(
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()),
write
);
}

p.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it intend for run asynchronously? Please, append waitUntilFinish() call to the result of the run().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigtable Issues related to the Bigtable API. samples Issues that are directly related to samples.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants