diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java new file mode 100644 index 0000000000..025d5ad5e0 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -0,0 +1,100 @@ +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.storage.v1beta2.*; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.concurrent.ExecutionException; +import org.json.JSONArray; +import org.json.JSONObject; + +public class WriteCommittedStream { + + public static Status.Code getStatusCode(StatusRuntimeException e) { + return e.getStatus().getCode(); + } + + public static void runWriteCommittedStream() { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + writeCommittedStream(projectId, datasetName, tableName); + } + + public static void writeCommittedStream(String projectId, String datasetName, String tableName) { + + try (BigQueryWriteClient client = BigQueryWriteClient.create()) { + + // Initialize a write stream for the specified table. + WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); + + TableName parent = TableName.of(projectId, datasetName, tableName); + + CreateWriteStreamRequest createWriteStreamRequest = + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(stream) + .build(); + WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + + // Use the JSON stream writer to send records in JSON format. + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) + .build()) { + + int offsets[] = {0, 1, 2, 3, 4, 5, 5}; + // The last offset is repeated. This will cause an ALREADY_EXISTS error. + + for (int i : offsets) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr, i, false); + AppendRowsResponse response = future.get(); + } + + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof StatusRuntimeException) { + System.out.println( + "Failed with status = " + getStatusCode((StatusRuntimeException) cause)); + } + throw ex; + } + + System.out.println("Appended records successfully."); + + } catch (Exception e) { + System.out.println("Failed to append records. \n" + e.toString()); + } + } + + public static void writeToDefaultStream(String projectId, String datasetName, String tableName) { + + TableName parent = TableName.of(projectId, datasetName, tableName); + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + Table table = bigquery.getTable(datasetName, tableName); + Schema schema = table.getDefinition().getSchema(); + + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { + + for (int i = 0; i < 10; i++) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr, false); + AppendRowsResponse response = future.get(); + } + } catch (Exception e) { + System.out.println(e); + } + } +} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java new file mode 100644 index 0000000000..9fe5cc96e3 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -0,0 +1,64 @@ +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.storage.v1beta2.*; +import org.json.JSONArray; +import org.json.JSONObject; + +public class WritePendingStream { + + public static void runWritePendingStream() { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + + writePendingStream(projectId, datasetName, tableName); + } + + public static void writePendingStream(String projectId, String datasetName, String tableName) { + + try (BigQueryWriteClient client = BigQueryWriteClient.create()) { + + WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); + + TableName parent = TableName.of(projectId, datasetName, tableName); + + CreateWriteStreamRequest createWriteStreamRequest = + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(stream) + .build(); + WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) + .build()) { + + for (int i = 0; i < 10; i++) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("batch-record %03d", i)); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(record); + + ApiFuture future = writer.append(jsonArr, false); + AppendRowsResponse response = future.get(); + } + FinalizeWriteStreamResponse finalizeResponse = + client.finalizeWriteStream(writeStream.getName()); + System.out.println("Rows written: " + finalizeResponse.getRowCount()); + } + + // Commit the streams + BatchCommitWriteStreamsRequest commitRequest = + BatchCommitWriteStreamsRequest.newBuilder() + .setParent(parent.toString()) + .addWriteStreams(writeStream.getName()) + .build(); + BatchCommitWriteStreamsResponse commitResponse = + client.batchCommitWriteStreams(commitRequest); + } catch (Exception e) { + System.out.println(e); + } + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java new file mode 100644 index 0000000000..ef871110dd --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java @@ -0,0 +1,73 @@ +package com.example.bigquerystorage; + +import static junit.framework.TestCase.assertNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteAPISampleIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); + private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); + + private final Logger log = Logger.getLogger(this.getClass().getName()); + private ByteArrayOutputStream bout; + private PrintStream out; + private PrintStream originalPrintStream; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + requireEnvVar("BIGQUERY_DATASET_NAME"); + requireEnvVar("BIGQUERY_TABLE_NAME"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + originalPrintStream = System.out; + System.setOut(out); + } + + @After + public void tearDown() { + System.out.flush(); + System.setOut(originalPrintStream); + log.log(Level.INFO, "\n" + bout.toString()); + } + + @Test + public void testWriteCommittedStream() throws Exception { + WriteCommittedStream.writeCommittedStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteCommittedStream.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + } + + @Test + public void testWritePendingStream() throws Exception { + WritePendingStream.writePendingStream( + GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); + } +}