From d1fa8db103b81ded999fb684532c1ec76d319286 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Wed, 30 Dec 2020 19:26:13 +0000 Subject: [PATCH 01/21] docs(samples): jsonstreamwriter samples --- .../bigquerystorage/WriteCommittedStream.java | 100 ++++++++++++++++++ .../bigquerystorage/WritePendingStream.java | 64 +++++++++++ .../bigquerystorage/WriteAPISampleIT.java | 73 +++++++++++++ 3 files changed, 237 insertions(+) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java new file mode 100644 index 0000000000..025d5ad5e0 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -0,0 +1,100 @@ +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.storage.v1beta2.*; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.concurrent.ExecutionException; +import org.json.JSONArray; +import org.json.JSONObject; + +public class WriteCommittedStream { + + public static Status.Code getStatusCode(StatusRuntimeException e) { + return e.getStatus().getCode(); + } + + public static void runWriteCommittedStream() { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + writeCommittedStream(projectId, datasetName, tableName); + } + + public static void writeCommittedStream(String projectId, String datasetName, String tableName) { + + try (BigQueryWriteClient client = BigQueryWriteClient.create()) { + + // Initialize a write stream for the specified table. + WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); + + TableName parent = TableName.of(projectId, datasetName, tableName); + + CreateWriteStreamRequest createWriteStreamRequest = + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(stream) + .build(); + WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + + // Use the JSON stream writer to send records in JSON format. + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) + .build()) { + + int offsets[] = {0, 1, 2, 3, 4, 5, 5}; + // The last offset is repeated. This will cause an ALREADY_EXISTS error. + + for (int i : offsets) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr, i, false); + AppendRowsResponse response = future.get(); + } + + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof StatusRuntimeException) { + System.out.println( + "Failed with status = " + getStatusCode((StatusRuntimeException) cause)); + } + throw ex; + } + + System.out.println("Appended records successfully."); + + } catch (Exception e) { + System.out.println("Failed to append records. \n" + e.toString()); + } + } + + public static void writeToDefaultStream(String projectId, String datasetName, String tableName) { + + TableName parent = TableName.of(projectId, datasetName, tableName); + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + Table table = bigquery.getTable(datasetName, tableName); + Schema schema = table.getDefinition().getSchema(); + + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { + + for (int i = 0; i < 10; i++) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr, false); + AppendRowsResponse response = future.get(); + } + } catch (Exception e) { + System.out.println(e); + } + } +} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java new file mode 100644 index 0000000000..9fe5cc96e3 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -0,0 +1,64 @@ +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.storage.v1beta2.*; +import org.json.JSONArray; +import org.json.JSONObject; + +public class WritePendingStream { + + public static void runWritePendingStream() { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + + writePendingStream(projectId, datasetName, tableName); + } + + public static void writePendingStream(String projectId, String datasetName, String tableName) { + + try (BigQueryWriteClient client = BigQueryWriteClient.create()) { + + WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); + + TableName parent = TableName.of(projectId, datasetName, tableName); + + CreateWriteStreamRequest createWriteStreamRequest = + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(stream) + .build(); + WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) + .build()) { + + for (int i = 0; i < 10; i++) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("batch-record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr, false); + AppendRowsResponse response = future.get(); + } + FinalizeWriteStreamResponse finalizeResponse = + client.finalizeWriteStream(writeStream.getName()); + System.out.println("Rows written: " + finalizeResponse.getRowCount()); + } + + // Commit the streams + BatchCommitWriteStreamsRequest commitRequest = + BatchCommitWriteStreamsRequest.newBuilder() + .setParent(parent.toString()) + .addWriteStreams(writeStream.getName()) + .build(); + BatchCommitWriteStreamsResponse commitResponse = + client.batchCommitWriteStreams(commitRequest); + } catch (Exception e) { + System.out.println(e); + } + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java new file mode 100644 index 0000000000..ef871110dd --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java @@ -0,0 +1,73 @@ +package com.example.bigquerystorage; + +import static junit.framework.TestCase.assertNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteAPISampleIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); + private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); + + private final Logger log = Logger.getLogger(this.getClass().getName()); + private ByteArrayOutputStream bout; + private PrintStream out; + private PrintStream originalPrintStream; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + requireEnvVar("BIGQUERY_DATASET_NAME"); + requireEnvVar("BIGQUERY_TABLE_NAME"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + originalPrintStream = System.out; + System.setOut(out); + } + + @After + public void tearDown() { + System.out.flush(); + System.setOut(originalPrintStream); + log.log(Level.INFO, "\n" + bout.toString()); + } + + @Test + public void testWriteCommittedStream() throws Exception { + WriteCommittedStream.writeCommittedStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteCommittedStream.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + } + + @Test + public void testWritePendingStream() throws Exception { + WritePendingStream.writePendingStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + } +} From d4ce98a4f1e1f375d7ff1b7fe42533146673b2e0 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Wed, 30 Dec 2020 19:43:29 +0000 Subject: [PATCH 02/21] Add copyright notice --- .../bigquerystorage/WriteCommittedStream.java | 16 ++++++++++++++++ .../bigquerystorage/WritePendingStream.java | 16 ++++++++++++++++ .../bigquerystorage/WriteAPISampleIT.java | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 025d5ad5e0..0ecd78f22e 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -1,3 +1,19 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example.bigquerystorage; import com.google.api.core.ApiFuture; diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 9fe5cc96e3..5d357a3e2c 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -1,3 +1,19 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example.bigquerystorage; import com.google.api.core.ApiFuture; diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java index ef871110dd..34db9e3b8e 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java @@ -1,3 +1,19 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example.bigquerystorage; import static junit.framework.TestCase.assertNotNull; From bfb4403175e155afd71420b982baf68e432d6757 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Wed, 30 Dec 2020 21:57:11 +0000 Subject: [PATCH 03/21] Remove allowUnknownFields parameter --- .../com/example/bigquerystorage/WriteCommittedStream.java | 4 ++-- .../java/com/example/bigquerystorage/WritePendingStream.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 0ecd78f22e..1fc6b0f45b 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -69,7 +69,7 @@ public static void writeCommittedStream(String projectId, String datasetName, St JSONArray jsonArr = new JSONArray(); jsonArr.put(record); - ApiFuture future = writer.append(jsonArr, i, false); + ApiFuture future = writer.append(jsonArr, i); AppendRowsResponse response = future.get(); } @@ -106,7 +106,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St JSONArray jsonArr = new JSONArray(); jsonArr.put(record); - ApiFuture future = writer.append(jsonArr, false); + ApiFuture future = writer.append(jsonArr); AppendRowsResponse response = future.get(); } } catch (Exception e) { diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 5d357a3e2c..5cc504962a 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -57,7 +57,7 @@ public static void writePendingStream(String projectId, String datasetName, Stri JSONArray jsonArr = new JSONArray(); jsonArr.put(record); - ApiFuture future = writer.append(jsonArr, false); + ApiFuture future = writer.append(jsonArr); AppendRowsResponse response = future.get(); } FinalizeWriteStreamResponse finalizeResponse = From c3735b5700bc90d6fe4c6a0bcc079b86bee04d6c Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Thu, 31 Dec 2020 01:27:02 +0000 Subject: [PATCH 04/21] Added retry with exponential backoff --- .../bigquerystorage/WriteCommittedStream.java | 45 ++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 1fc6b0f45b..5b374dbc03 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -16,6 +16,7 @@ package com.example.bigquerystorage; +import com.google.api.client.util.*; import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.*; import com.google.cloud.bigquery.storage.v1beta2.*; @@ -27,10 +28,20 @@ public class WriteCommittedStream { - public static Status.Code getStatusCode(StatusRuntimeException e) { + static Status.Code getStatusCode(StatusRuntimeException e) { return e.getStatus().getCode(); } + // Returns true if the operation should be retried. + static Boolean isRetryable(ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof StatusRuntimeException) { + Status status = ((StatusRuntimeException)cause).getStatus(); + return (status == Status.ABORTED); + } + return false; + } + public static void runWriteCommittedStream() { // TODO(developer): Replace these variables before running the sample. String projectId = "MY_PROJECT_ID"; @@ -85,7 +96,7 @@ public static void writeCommittedStream(String projectId, String datasetName, St System.out.println("Appended records successfully."); } catch (Exception e) { - System.out.println("Failed to append records. \n" + e.toString()); + System.out.println("Failed to append records.\n" + e.toString()); } } @@ -100,17 +111,41 @@ public static void writeToDefaultStream(String projectId, String datasetName, St try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { + ExponentialBackOff backoff = new ExponentialBackOff(); + for (int i = 0; i < 10; i++) { JSONObject record = new JSONObject(); record.put("col1", String.format("record %03d", i)); JSONArray jsonArr = new JSONArray(); jsonArr.put(record); - ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); + backoff.reset(); + Boolean retry = true; + while (retry) { + try { + + ApiFuture future = writer.append(jsonArr); + AppendRowsResponse response = future.get(); + retry = false; + + } catch (ExecutionException ex) { + // If the error is retryable, retry the operation with exponential backoff. + // Don't retry past the maximum retry interval. + long backOffMillis = backoff.nextBackOffMillis(); + if (isRetryable(ex) && backOffMillis != BackOff.STOP) { + Thread.sleep(backOffMillis); + } + else { + throw ex; + } + } + } } + + System.out.println("Appended records successfully."); + } catch (Exception e) { - System.out.println(e); + System.out.println("Failed to append records.\n" + e.toString()); } } } From 7bc0f792e8261f3077c7dcfa73f1893293d9b733 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Tue, 5 Jan 2021 20:45:15 +0000 Subject: [PATCH 05/21] Revert "Added retry with exponential backoff" Remove the backoff logic to keep the sample code simpler. --- .../bigquerystorage/WriteCommittedStream.java | 45 +++---------------- 1 file changed, 5 insertions(+), 40 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 5b374dbc03..1fc6b0f45b 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -16,7 +16,6 @@ package com.example.bigquerystorage; -import com.google.api.client.util.*; import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.*; import com.google.cloud.bigquery.storage.v1beta2.*; @@ -28,20 +27,10 @@ public class WriteCommittedStream { - static Status.Code getStatusCode(StatusRuntimeException e) { + public static Status.Code getStatusCode(StatusRuntimeException e) { return e.getStatus().getCode(); } - // Returns true if the operation should be retried. - static Boolean isRetryable(ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof StatusRuntimeException) { - Status status = ((StatusRuntimeException)cause).getStatus(); - return (status == Status.ABORTED); - } - return false; - } - public static void runWriteCommittedStream() { // TODO(developer): Replace these variables before running the sample. String projectId = "MY_PROJECT_ID"; @@ -96,7 +85,7 @@ public static void writeCommittedStream(String projectId, String datasetName, St System.out.println("Appended records successfully."); } catch (Exception e) { - System.out.println("Failed to append records.\n" + e.toString()); + System.out.println("Failed to append records. \n" + e.toString()); } } @@ -111,41 +100,17 @@ public static void writeToDefaultStream(String projectId, String datasetName, St try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { - ExponentialBackOff backoff = new ExponentialBackOff(); - for (int i = 0; i < 10; i++) { JSONObject record = new JSONObject(); record.put("col1", String.format("record %03d", i)); JSONArray jsonArr = new JSONArray(); jsonArr.put(record); - backoff.reset(); - Boolean retry = true; - while (retry) { - try { - - ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); - retry = false; - - } catch (ExecutionException ex) { - // If the error is retryable, retry the operation with exponential backoff. - // Don't retry past the maximum retry interval. - long backOffMillis = backoff.nextBackOffMillis(); - if (isRetryable(ex) && backOffMillis != BackOff.STOP) { - Thread.sleep(backOffMillis); - } - else { - throw ex; - } - } - } + ApiFuture future = writer.append(jsonArr); + AppendRowsResponse response = future.get(); } - - System.out.println("Appended records successfully."); - } catch (Exception e) { - System.out.println("Failed to append records.\n" + e.toString()); + System.out.println(e); } } } From 56ca8e5c5fe9dff65cb29d2b849e3dd758889e2e Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Fri, 8 Jan 2021 18:42:17 +0000 Subject: [PATCH 06/21] Addressed PR review feedback. - Simplify code, remove duplicate-record example. - Split all snippets and tests into separate classes. - Add comments and javadocs links. - Clean up imports. - Add region tags. - Catch only specific exceptions. - Run linter and fmt-maven-plugin. --- .../bigquerystorage/WriteCommittedStream.java | 82 +++++++------------ .../bigquerystorage/WritePendingStream.java | 47 ++++++++--- .../bigquerystorage/WriteToDefaultStream.java | 80 ++++++++++++++++++ ...pleIT.java => WriteCommittedStreamIT.java} | 16 +--- .../bigquerystorage/WritePendingStreamIT.java | 79 ++++++++++++++++++ .../WriteToDefaultStreamIT.java | 79 ++++++++++++++++++ 6 files changed, 307 insertions(+), 76 deletions(-) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java rename samples/snippets/src/test/java/com/example/bigquerystorage/{WriteAPISampleIT.java => WriteCommittedStreamIT.java} (85%) create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 1fc6b0f45b..ac49b0c316 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -16,22 +16,24 @@ package com.example.bigquerystorage; +// [START bigquerystorage_jsonstreamwriter_committed] import com.google.api.core.ApiFuture; -import com.google.cloud.bigquery.*; -import com.google.cloud.bigquery.storage.v1beta2.*; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; +import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1beta2.TableName; +import com.google.cloud.bigquery.storage.v1beta2.WriteStream; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; import java.util.concurrent.ExecutionException; import org.json.JSONArray; import org.json.JSONObject; public class WriteCommittedStream { - public static Status.Code getStatusCode(StatusRuntimeException e) { - return e.getStatus().getCode(); - } - - public static void runWriteCommittedStream() { + public static void runWriteCommittedStream() + throws DescriptorValidationException, InterruptedException, IOException { // TODO(developer): Replace these variables before running the sample. String projectId = "MY_PROJECT_ID"; String datasetName = "MY_DATASET_NAME"; @@ -39,78 +41,52 @@ public static void runWriteCommittedStream() { writeCommittedStream(projectId, datasetName, tableName); } - public static void writeCommittedStream(String projectId, String datasetName, String tableName) { + public static void writeCommittedStream(String projectId, String datasetName, String tableName) + throws DescriptorValidationException, InterruptedException, IOException { try (BigQueryWriteClient client = BigQueryWriteClient.create()) { // Initialize a write stream for the specified table. + // For more information on WriteStream.Type, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); - - TableName parent = TableName.of(projectId, datasetName, tableName); - + TableName parentTable = TableName.of(projectId, datasetName, tableName); CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) + .setParent(parentTable.toString()) .setWriteStream(stream) .build(); WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); // Use the JSON stream writer to send records in JSON format. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) .build()) { - int offsets[] = {0, 1, 2, 3, 4, 5, 5}; - // The last offset is repeated. This will cause an ALREADY_EXISTS error. + // Append 10 JSON objects to the stream. + for (int i = 0; i < 10; i++) { - for (int i : offsets) { + // Create a JSON object that is compatible with the table schema. JSONObject record = new JSONObject(); record.put("col1", String.format("record %03d", i)); JSONArray jsonArr = new JSONArray(); jsonArr.put(record); + // To detect duplicate records, pass the index as the record offset. + // To disable deduplication, omit the offset or use WriteStream.Type.DEFAULT. ApiFuture future = writer.append(jsonArr, i); AppendRowsResponse response = future.get(); } - - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - if (cause instanceof StatusRuntimeException) { - System.out.println( - "Failed with status = " + getStatusCode((StatusRuntimeException) cause)); - } - throw ex; } - System.out.println("Appended records successfully."); - - } catch (Exception e) { + } catch (ExecutionException e) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: + // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html System.out.println("Failed to append records. \n" + e.toString()); } } - - public static void writeToDefaultStream(String projectId, String datasetName, String tableName) { - - TableName parent = TableName.of(projectId, datasetName, tableName); - - BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); - Table table = bigquery.getTable(datasetName, tableName); - Schema schema = table.getDefinition().getSchema(); - - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { - - for (int i = 0; i < 10; i++) { - JSONObject record = new JSONObject(); - record.put("col1", String.format("record %03d", i)); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(record); - - ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); - } - } catch (Exception e) { - System.out.println(e); - } - } } +// [END bigquerystorage_jsonstreamwriter_committed] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 5cc504962a..47bcb9269a 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -16,14 +16,28 @@ package com.example.bigquerystorage; +// [START bigquerystorage_jsonstreamwriter_pending] import com.google.api.core.ApiFuture; -import com.google.cloud.bigquery.storage.v1beta2.*; +import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1beta2.TableName; +import com.google.cloud.bigquery.storage.v1beta2.WriteStream; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; import org.json.JSONArray; import org.json.JSONObject; public class WritePendingStream { - public static void runWritePendingStream() { + public static void runWritePendingStream() + throws DescriptorValidationException, InterruptedException, IOException { + // TODO(developer): Replace these variables before running the sample. String projectId = "MY_PROJECT_ID"; String datasetName = "MY_DATASET_NAME"; @@ -32,26 +46,34 @@ public static void runWritePendingStream() { writePendingStream(projectId, datasetName, tableName); } - public static void writePendingStream(String projectId, String datasetName, String tableName) { + public static void writePendingStream(String projectId, String datasetName, String tableName) + throws DescriptorValidationException, InterruptedException, IOException { try (BigQueryWriteClient client = BigQueryWriteClient.create()) { + // Initialize a write stream for the specified table. + // For more information on WriteStream.Type, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); - - TableName parent = TableName.of(projectId, datasetName, tableName); - + TableName parentTable = TableName.of(projectId, datasetName, tableName); CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) + .setParent(parentTable.toString()) .setWriteStream(stream) .build(); WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + // Use the JSON stream writer to send records in JSON format. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) .build()) { + // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { + + // Create a JSON object that is compatible with the table schema. JSONObject record = new JSONObject(); record.put("col1", String.format("batch-record %03d", i)); JSONArray jsonArr = new JSONArray(); @@ -65,16 +87,21 @@ public static void writePendingStream(String projectId, String datasetName, Stri System.out.println("Rows written: " + finalizeResponse.getRowCount()); } - // Commit the streams + // Commit the streams. BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder() - .setParent(parent.toString()) + .setParent(parentTable.toString()) .addWriteStreams(writeStream.getName()) .build(); BatchCommitWriteStreamsResponse commitResponse = client.batchCommitWriteStreams(commitRequest); - } catch (Exception e) { + System.out.println("Appended and committed records successfully."); + } catch (ExecutionException e) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: + // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html System.out.println(e); } } } +// [START bigquerystorage_jsonstreamwriter_pending] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java new file mode 100644 index 0000000000..1ee2459561 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_jsonstreamwriter_default] +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1beta2.TableName; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import org.json.JSONArray; +import org.json.JSONObject; + +public class WriteToDefaultStream { + + public static void runWriteToDefaultStream() + throws DescriptorValidationException, InterruptedException, IOException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + writeToDefaultStream(projectId, datasetName, tableName); + } + + public static void writeToDefaultStream(String projectId, String datasetName, String tableName) + throws DescriptorValidationException, InterruptedException, IOException { + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + Table table = bigquery.getTable(datasetName, tableName); + TableName parentTable = TableName.of(projectId, datasetName, tableName); + Schema schema = table.getDefinition().getSchema(); + + // Use the JSON stream writer to send records in JSON format. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) { + + // Append 10 JSON objects to the stream. + for (int i = 0; i < 10; i++) { + + // Create a JSON object that is compatible with the table schema. + JSONObject record = new JSONObject(); + record.put("col1", String.format("record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr); + AppendRowsResponse response = future.get(); + } + System.out.println("Appended records successfully."); + } catch (ExecutionException e) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: + // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html + System.out.println("Failed to append records. \n" + e.toString()); + } + } +} +// [END bigquerystorage_jsonstreamwriter_default] diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java similarity index 85% rename from samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java rename to samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java index 34db9e3b8e..63d938733e 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java @@ -16,6 +16,7 @@ package com.example.bigquerystorage; +import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; import java.io.ByteArrayOutputStream; @@ -30,7 +31,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class WriteAPISampleIT { +public class WriteCommittedStreamIT { private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); @@ -73,17 +74,6 @@ public void tearDown() { public void testWriteCommittedStream() throws Exception { WriteCommittedStream.writeCommittedStream( GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); - } - - @Test - public void testWriteToDefaultStream() throws Exception { - WriteCommittedStream.writeToDefaultStream( - GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); - } - - @Test - public void testWritePendingStream() throws Exception { - WritePendingStream.writePendingStream( - GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + assertThat(bout.toString()).contains("Appended records successfully."); } } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java new file mode 100644 index 0000000000..ff9f2fbfca --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WritePendingStreamIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); + private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); + + private final Logger log = Logger.getLogger(this.getClass().getName()); + private ByteArrayOutputStream bout; + private PrintStream out; + private PrintStream originalPrintStream; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + requireEnvVar("BIGQUERY_DATASET_NAME"); + requireEnvVar("BIGQUERY_TABLE_NAME"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + originalPrintStream = System.out; + System.setOut(out); + } + + @After + public void tearDown() { + System.out.flush(); + System.setOut(originalPrintStream); + log.log(Level.INFO, "\n" + bout.toString()); + } + + @Test + public void testWritePendingStream() throws Exception { + WritePendingStream.writePendingStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + assertThat(bout.toString()).contains("Appended and committed records successfully."); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java new file mode 100644 index 0000000000..4e617dc222 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteToDefaultStreamIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); + private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); + + private final Logger log = Logger.getLogger(this.getClass().getName()); + private ByteArrayOutputStream bout; + private PrintStream out; + private PrintStream originalPrintStream; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + requireEnvVar("BIGQUERY_DATASET_NAME"); + requireEnvVar("BIGQUERY_TABLE_NAME"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + originalPrintStream = System.out; + System.setOut(out); + } + + @After + public void tearDown() { + System.out.flush(); + System.setOut(originalPrintStream); + log.log(Level.INFO, "\n" + bout.toString()); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteToDefaultStream.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} From 7135af1cd5ee053c100900785d6977bc68a0ea29 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Fri, 8 Jan 2021 18:50:40 +0000 Subject: [PATCH 07/21] docs(samples): Fix mismatched region tag --- .../java/com/example/bigquerystorage/WritePendingStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 47bcb9269a..104577b0a0 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -104,4 +104,4 @@ public static void writePendingStream(String projectId, String datasetName, Stri } } } -// [START bigquerystorage_jsonstreamwriter_pending] +// [END bigquerystorage_jsonstreamwriter_pending] From 3e47e5baa866e0e90149bcfecf27c18287e0d4ff Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:29:44 -0500 Subject: [PATCH 08/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java --- .../java/com/example/bigquerystorage/WriteCommittedStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index ac49b0c316..e458bba35f 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -67,7 +67,6 @@ public static void writeCommittedStream(String projectId, String datasetName, St // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { - // Create a JSON object that is compatible with the table schema. JSONObject record = new JSONObject(); record.put("col1", String.format("record %03d", i)); From f495b7ab4c7e166f681f8c0a1d847c547ca730d9 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:29:50 -0500 Subject: [PATCH 09/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java --- .../java/com/example/bigquerystorage/WritePendingStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 104577b0a0..4e98d7ca96 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -37,7 +37,6 @@ public class WritePendingStream { public static void runWritePendingStream() throws DescriptorValidationException, InterruptedException, IOException { - // TODO(developer): Replace these variables before running the sample. String projectId = "MY_PROJECT_ID"; String datasetName = "MY_DATASET_NAME"; From eb2e3514a53485d094d552beda3cfedbc56516b0 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:29:57 -0500 Subject: [PATCH 10/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java --- .../java/com/example/bigquerystorage/WritePendingStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 4e98d7ca96..7b4631fe3e 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -49,7 +49,6 @@ public static void writePendingStream(String projectId, String datasetName, Stri throws DescriptorValidationException, InterruptedException, IOException { try (BigQueryWriteClient client = BigQueryWriteClient.create()) { - // Initialize a write stream for the specified table. // For more information on WriteStream.Type, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html From 4529da8d301ec6ee3d95b699ba7086caeb948535 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:06 -0500 Subject: [PATCH 11/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java --- .../java/com/example/bigquerystorage/WritePendingStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 7b4631fe3e..3cca45df3d 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -70,7 +70,6 @@ public static void writePendingStream(String projectId, String datasetName, Stri // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { - // Create a JSON object that is compatible with the table schema. JSONObject record = new JSONObject(); record.put("col1", String.format("batch-record %03d", i)); From 93a9da6c81e916adbc0f34350d25eecfc803c455 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:12 -0500 Subject: [PATCH 12/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java --- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 1ee2459561..d1b6a9fef6 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -39,6 +39,7 @@ public static void runWriteToDefaultStream() String projectId = "MY_PROJECT_ID"; String datasetName = "MY_DATASET_NAME"; String tableName = "MY_TABLE_NAME"; + writeToDefaultStream(projectId, datasetName, tableName); } From eca975023a70a3c6fb93d84b0c9dca4e27e30425 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:19 -0500 Subject: [PATCH 13/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java --- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index d1b6a9fef6..e7f9750cb1 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -56,7 +56,6 @@ public static void writeToDefaultStream(String projectId, String datasetName, St // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) { - // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { From 0c411b29ec240f36f5ea165569ea7b577311e915 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:26 -0500 Subject: [PATCH 14/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java --- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index e7f9750cb1..3769d5b0e1 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -45,7 +45,6 @@ public static void runWriteToDefaultStream() public static void writeToDefaultStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { - BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); Table table = bigquery.getTable(datasetName, tableName); TableName parentTable = TableName.of(projectId, datasetName, tableName); From 10ee1ff5405bdd95c5413ffaf9b21ff7c5247ad1 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:34 -0500 Subject: [PATCH 15/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java --- .../java/com/example/bigquerystorage/WriteToDefaultStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 3769d5b0e1..182d4469a4 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -57,7 +57,6 @@ public static void writeToDefaultStream(String projectId, String datasetName, St JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) { // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { - // Create a JSON object that is compatible with the table schema. JSONObject record = new JSONObject(); record.put("col1", String.format("record %03d", i)); From 2cea8bc8de258880ff2e09243aa7e7b0b9f46a1e Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:41 -0500 Subject: [PATCH 16/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java --- .../java/com/example/bigquerystorage/WritePendingStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 3cca45df3d..55bef7382e 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -67,7 +67,6 @@ public static void writePendingStream(String projectId, String datasetName, Stri try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) .build()) { - // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { // Create a JSON object that is compatible with the table schema. From f0bbd91f0478f254d897beae897bafe4c5f9e569 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:30:50 -0500 Subject: [PATCH 17/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java --- .../java/com/example/bigquerystorage/WriteCommittedStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index e458bba35f..38f3d7d6c3 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -45,7 +45,6 @@ public static void writeCommittedStream(String projectId, String datasetName, St throws DescriptorValidationException, InterruptedException, IOException { try (BigQueryWriteClient client = BigQueryWriteClient.create()) { - // Initialize a write stream for the specified table. // For more information on WriteStream.Type, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html From 5c686656839883361d018f995c34425a7ea5e345 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:31:13 -0500 Subject: [PATCH 18/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java --- .../java/com/example/bigquerystorage/WriteCommittedStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 38f3d7d6c3..30fc81528f 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -38,6 +38,7 @@ public static void runWriteCommittedStream() String projectId = "MY_PROJECT_ID"; String datasetName = "MY_DATASET_NAME"; String tableName = "MY_TABLE_NAME"; + writeCommittedStream(projectId, datasetName, tableName); } From c044a42ce6687a0d3612b38429e8a272376430e6 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:31:41 -0500 Subject: [PATCH 19/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java --- .../java/com/example/bigquerystorage/WriteCommittedStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 30fc81528f..b3ad8bc0d8 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -64,7 +64,6 @@ public static void writeCommittedStream(String projectId, String datasetName, St try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) .build()) { - // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { // Create a JSON object that is compatible with the table schema. From ec2ea6189b6b5bbdcdcfe5bd26869b41ffbaddc9 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 8 Jan 2021 15:32:06 -0500 Subject: [PATCH 20/21] Update samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java --- .../java/com/example/bigquerystorage/WritePendingStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 55bef7382e..813e59d957 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -47,7 +47,6 @@ public static void runWritePendingStream() public static void writePendingStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { - try (BigQueryWriteClient client = BigQueryWriteClient.create()) { // Initialize a write stream for the specified table. // For more information on WriteStream.Type, see: From edd650d24b5594eee948eda8beb9948f698193be Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Tue, 12 Jan 2021 01:27:47 +0000 Subject: [PATCH 21/21] docs(samples): Create test resouces Create temporary dataset and table for sample integration tests --- .../WriteCommittedStreamIT.java | 45 +++++++++++++------ .../bigquerystorage/WritePendingStreamIT.java | 45 +++++++++++++------ .../WriteToDefaultStreamIT.java | 45 +++++++++++++------ 3 files changed, 93 insertions(+), 42 deletions(-) diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java index 63d938733e..2dfde82a3f 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteCommittedStreamIT.java @@ -19,10 +19,20 @@ import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.UUID; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -34,13 +44,12 @@ public class WriteCommittedStreamIT { private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); - private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); - private final Logger log = Logger.getLogger(this.getClass().getName()); private ByteArrayOutputStream bout; private PrintStream out; - private PrintStream originalPrintStream; + private BigQuery bigquery; + private String datasetName; + private String tableName; private static void requireEnvVar(String varName) { assertNotNull( @@ -51,29 +60,37 @@ private static void requireEnvVar(String varName) { @BeforeClass public static void checkRequirements() { requireEnvVar("GOOGLE_CLOUD_PROJECT"); - requireEnvVar("BIGQUERY_DATASET_NAME"); - requireEnvVar("BIGQUERY_TABLE_NAME"); } @Before public void setUp() { bout = new ByteArrayOutputStream(); out = new PrintStream(bout); - originalPrintStream = System.out; System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "COMMITTED_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING)); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); } @After public void tearDown() { - System.out.flush(); - System.setOut(originalPrintStream); - log.log(Level.INFO, "\n" + bout.toString()); + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); } @Test public void testWriteCommittedStream() throws Exception { - WriteCommittedStream.writeCommittedStream( - GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + WriteCommittedStream.writeCommittedStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName); assertThat(bout.toString()).contains("Appended records successfully."); } } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java index ff9f2fbfca..339a82c9d2 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WritePendingStreamIT.java @@ -19,10 +19,20 @@ import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.UUID; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -34,13 +44,12 @@ public class WritePendingStreamIT { private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); - private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); - private final Logger log = Logger.getLogger(this.getClass().getName()); private ByteArrayOutputStream bout; private PrintStream out; - private PrintStream originalPrintStream; + private BigQuery bigquery; + private String datasetName; + private String tableName; private static void requireEnvVar(String varName) { assertNotNull( @@ -51,29 +60,37 @@ private static void requireEnvVar(String varName) { @BeforeClass public static void checkRequirements() { requireEnvVar("GOOGLE_CLOUD_PROJECT"); - requireEnvVar("BIGQUERY_DATASET_NAME"); - requireEnvVar("BIGQUERY_TABLE_NAME"); } @Before public void setUp() { bout = new ByteArrayOutputStream(); out = new PrintStream(bout); - originalPrintStream = System.out; System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "PENDING_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING)); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); } @After public void tearDown() { - System.out.flush(); - System.setOut(originalPrintStream); - log.log(Level.INFO, "\n" + bout.toString()); + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); } @Test public void testWritePendingStream() throws Exception { - WritePendingStream.writePendingStream( - GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + WritePendingStream.writePendingStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName); assertThat(bout.toString()).contains("Appended and committed records successfully."); } } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java index 4e617dc222..c1f79f1584 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java @@ -19,10 +19,20 @@ import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.UUID; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -34,13 +44,12 @@ public class WriteToDefaultStreamIT { private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); - private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); - private final Logger log = Logger.getLogger(this.getClass().getName()); private ByteArrayOutputStream bout; private PrintStream out; - private PrintStream originalPrintStream; + private BigQuery bigquery; + private String datasetName; + private String tableName; private static void requireEnvVar(String varName) { assertNotNull( @@ -51,29 +60,37 @@ private static void requireEnvVar(String varName) { @BeforeClass public static void checkRequirements() { requireEnvVar("GOOGLE_CLOUD_PROJECT"); - requireEnvVar("BIGQUERY_DATASET_NAME"); - requireEnvVar("BIGQUERY_TABLE_NAME"); } @Before public void setUp() { bout = new ByteArrayOutputStream(); out = new PrintStream(bout); - originalPrintStream = System.out; System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING)); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); } @After public void tearDown() { - System.out.flush(); - System.setOut(originalPrintStream); - log.log(Level.INFO, "\n" + bout.toString()); + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); } @Test public void testWriteToDefaultStream() throws Exception { - WriteToDefaultStream.writeToDefaultStream( - GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + WriteToDefaultStream.writeToDefaultStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName); assertThat(bout.toString()).contains("Appended records successfully."); } }