Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(samples): update WriteToDefaultStream.java sample #1305

Merged
merged 3 commits into from Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -108,6 +108,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-bigquerystora

| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
| BQ To BQ Storage Schema Converter | [source code](https://github.com/googleapis/java-bigquerystorage/blob/master/samples/snippets/src/main/java/com/example/bigquerystorage/BQToBQStorageSchemaConverter.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/BQToBQStorageSchemaConverter.java) |
| Parallel Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/master/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) |
| Storage Arrow Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/master/samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) |
| Storage Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/master/samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) |
Expand Down
@@ -0,0 +1,72 @@
package com.example.bigquerystorage;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.common.collect.ImmutableMap;

/** Converts structure from BigQuery client to BigQueryStorage client */
public class BQToBQStorageSchemaConverter {
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
ImmutableMap.of(
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
.build();

/**
* Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
*
* @param schema the BigQuery client Table Schema
* @return the bigquery storage API Table Schema
*/
public static TableSchema ConvertTableSchema(Schema schema) {
TableSchema.Builder result = TableSchema.newBuilder();
for (int i = 0; i < schema.getFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(schema.getFields().get(i)));
}
return result.build();
}

/**
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
*
* @param field the BigQuery client Field Schema
* @return the bigquery storage API Field Schema
*/
public static TableFieldSchema ConvertFieldSchema(Field field) {
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
if (field.getMode() == null) {
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
}
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
result.setName(field.getName());
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
if (field.getDescription() != null) {
result.setDescription(field.getDescription());
}
if (field.getSubFields() != null) {
for (int i = 0; i < field.getSubFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(field.getSubFields().get(i)));
}
}
return result.build();
}
}
Expand Up @@ -18,9 +18,12 @@

// [START bigquerystorage_jsonstreamwriter_default]
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.protobuf.Descriptors.DescriptorValidationException;
Expand All @@ -37,20 +40,16 @@ public static void runWriteToDefaultStream()
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
TableFieldSchema strField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_string")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, strField).build();
writeToDefaultStream(projectId, datasetName, tableName, tableSchema);
writeToDefaultStream(projectId, datasetName, tableName);
}

public static void writeToDefaultStream(
String projectId, String datasetName, String tableName, TableSchema tableSchema)
public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(datasetName, tableName);
TableName parentTable = TableName.of(projectId, datasetName, tableName);
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BQToBQStorageSchemaConverter.ConvertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
Expand Down
Expand Up @@ -30,8 +30,6 @@
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
Expand All @@ -52,7 +50,6 @@ public class WriteToDefaultStreamIT {
private BigQuery bigquery;
private String datasetName;
private String tableName;
private TableSchema tableSchema;

private static void requireEnvVar(String varName) {
assertNotNull(
Expand All @@ -76,23 +73,10 @@ public void setUp() {
// Create a new dataset and table for each test.
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
Schema schema = Schema.of(Field.of("test_string", StandardSQLTypeName.STRING));
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
TableFieldSchema strField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_string")
.build();
tableSchema = TableSchema.newBuilder().addFields(0, strField).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(datasetName, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test_string", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build())))
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
.build();
bigquery.create(tableInfo);
}
Expand All @@ -106,8 +90,7 @@ public void tearDown() {

@Test
public void testWriteToDefaultStream() throws Exception {
WriteToDefaultStream.writeToDefaultStream(
GOOGLE_CLOUD_PROJECT, datasetName, tableName, tableSchema);
WriteToDefaultStream.writeToDefaultStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName);
assertThat(bout.toString()).contains("Appended records successfully.");
}
}