Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add code samples for JsonStreamWriter
- Loading branch information
1 parent
f3c897f
commit 3138060
Showing
3 changed files
with
237 additions
and
0 deletions.
There are no files selected for viewing
100 changes: 100 additions & 0 deletions
100
samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package com.example.bigquerystorage; | ||
|
||
import com.google.api.core.ApiFuture; | ||
import com.google.cloud.bigquery.*; | ||
import com.google.cloud.bigquery.storage.v1beta2.*; | ||
import io.grpc.Status; | ||
import io.grpc.StatusRuntimeException; | ||
import java.util.concurrent.ExecutionException; | ||
import org.json.JSONArray; | ||
import org.json.JSONObject; | ||
|
||
public class WriteCommittedStream { | ||
|
||
public static Status.Code getStatusCode(StatusRuntimeException e) { | ||
return e.getStatus().getCode(); | ||
} | ||
|
||
public static void runWriteCommittedStream() { | ||
// TODO(developer): Replace these variables before running the sample. | ||
String projectId = "MY_PROJECT_ID"; | ||
String datasetName = "MY_DATASET_NAME"; | ||
String tableName = "MY_TABLE_NAME"; | ||
writeCommittedStream(projectId, datasetName, tableName); | ||
} | ||
|
||
public static void writeCommittedStream(String projectId, String datasetName, String tableName) { | ||
|
||
try (BigQueryWriteClient client = BigQueryWriteClient.create()) { | ||
|
||
// Initialize a write stream for the specified table. | ||
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); | ||
|
||
TableName parent = TableName.of(projectId, datasetName, tableName); | ||
|
||
CreateWriteStreamRequest createWriteStreamRequest = | ||
CreateWriteStreamRequest.newBuilder() | ||
.setParent(parent.toString()) | ||
.setWriteStream(stream) | ||
.build(); | ||
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); | ||
|
||
// Use the JSON stream writer to send records in JSON format. | ||
try (JsonStreamWriter writer = | ||
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) | ||
.build()) { | ||
|
||
int offsets[] = {0, 1, 2, 3, 4, 5, 5}; | ||
// The last offset is repeated. This will cause an ALREADY_EXISTS error. | ||
|
||
for (int i : offsets) { | ||
JSONObject record = new JSONObject(); | ||
record.put("col1", String.format("record %03d", i)); | ||
JSONArray jsonArr = new JSONArray(); | ||
jsonArr.put(record); | ||
|
||
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, i, false); | ||
AppendRowsResponse response = future.get(); | ||
} | ||
|
||
} catch (ExecutionException ex) { | ||
Throwable cause = ex.getCause(); | ||
if (cause instanceof StatusRuntimeException) { | ||
System.out.println( | ||
"Failed with status = " + getStatusCode((StatusRuntimeException) cause)); | ||
} | ||
throw ex; | ||
} | ||
|
||
System.out.println("Appended records successfully."); | ||
|
||
} catch (Exception e) { | ||
System.out.println("Failed to append records. \n" + e.toString()); | ||
} | ||
} | ||
|
||
public static void writeToDefaultStream(String projectId, String datasetName, String tableName) { | ||
|
||
TableName parent = TableName.of(projectId, datasetName, tableName); | ||
|
||
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); | ||
Table table = bigquery.getTable(datasetName, tableName); | ||
Schema schema = table.getDefinition().getSchema(); | ||
|
||
try (JsonStreamWriter writer = | ||
JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { | ||
|
||
for (int i = 0; i < 10; i++) { | ||
JSONObject record = new JSONObject(); | ||
record.put("col1", String.format("record %03d", i)); | ||
JSONArray jsonArr = new JSONArray(); | ||
jsonArr.put(record); | ||
|
||
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, false); | ||
AppendRowsResponse response = future.get(); | ||
} | ||
} catch (Exception e) { | ||
System.out.println(e); | ||
} | ||
} | ||
} |
64 changes: 64 additions & 0 deletions
64
samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package com.example.bigquerystorage; | ||
|
||
import com.google.api.core.ApiFuture; | ||
import com.google.cloud.bigquery.storage.v1beta2.*; | ||
import org.json.JSONArray; | ||
import org.json.JSONObject; | ||
|
||
public class WritePendingStream { | ||
|
||
public static void runWritePendingStream() { | ||
// TODO(developer): Replace these variables before running the sample. | ||
String projectId = "MY_PROJECT_ID"; | ||
String datasetName = "MY_DATASET_NAME"; | ||
String tableName = "MY_TABLE_NAME"; | ||
|
||
writePendingStream(projectId, datasetName, tableName); | ||
} | ||
|
||
public static void writePendingStream(String projectId, String datasetName, String tableName) { | ||
|
||
try (BigQueryWriteClient client = BigQueryWriteClient.create()) { | ||
|
||
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); | ||
|
||
TableName parent = TableName.of(projectId, datasetName, tableName); | ||
|
||
CreateWriteStreamRequest createWriteStreamRequest = | ||
CreateWriteStreamRequest.newBuilder() | ||
.setParent(parent.toString()) | ||
.setWriteStream(stream) | ||
.build(); | ||
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); | ||
|
||
try (JsonStreamWriter writer = | ||
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) | ||
.build()) { | ||
|
||
for (int i = 0; i < 10; i++) { | ||
JSONObject record = new JSONObject(); | ||
record.put("col1", String.format("batch-record %03d", i)); | ||
JSONArray jsonArr = new JSONArray(); | ||
jsonArr.put(record); | ||
|
||
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, false); | ||
AppendRowsResponse response = future.get(); | ||
} | ||
FinalizeWriteStreamResponse finalizeResponse = | ||
client.finalizeWriteStream(writeStream.getName()); | ||
System.out.println("Rows written: " + finalizeResponse.getRowCount()); | ||
} | ||
|
||
// Commit the streams | ||
BatchCommitWriteStreamsRequest commitRequest = | ||
BatchCommitWriteStreamsRequest.newBuilder() | ||
.setParent(parent.toString()) | ||
.addWriteStreams(writeStream.getName()) | ||
.build(); | ||
BatchCommitWriteStreamsResponse commitResponse = | ||
client.batchCommitWriteStreams(commitRequest); | ||
} catch (Exception e) { | ||
System.out.println(e); | ||
} | ||
} | ||
} |
73 changes: 73 additions & 0 deletions
73
samples/snippets/src/test/java/com/example/bigquerystorage/WriteAPISampleIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package com.example.bigquerystorage; | ||
|
||
import static junit.framework.TestCase.assertNotNull; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.PrintStream; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.JUnit4; | ||
|
||
@RunWith(JUnit4.class) | ||
public class WriteAPISampleIT { | ||
|
||
private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); | ||
private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME"); | ||
private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME"); | ||
|
||
private final Logger log = Logger.getLogger(this.getClass().getName()); | ||
private ByteArrayOutputStream bout; | ||
private PrintStream out; | ||
private PrintStream originalPrintStream; | ||
|
||
private static void requireEnvVar(String varName) { | ||
assertNotNull( | ||
"Environment variable " + varName + " is required to perform these tests.", | ||
System.getenv(varName)); | ||
} | ||
|
||
@BeforeClass | ||
public static void checkRequirements() { | ||
requireEnvVar("GOOGLE_CLOUD_PROJECT"); | ||
requireEnvVar("BIGQUERY_DATASET_NAME"); | ||
requireEnvVar("BIGQUERY_TABLE_NAME"); | ||
} | ||
|
||
@Before | ||
public void setUp() { | ||
bout = new ByteArrayOutputStream(); | ||
out = new PrintStream(bout); | ||
originalPrintStream = System.out; | ||
System.setOut(out); | ||
} | ||
|
||
@After | ||
public void tearDown() { | ||
System.out.flush(); | ||
System.setOut(originalPrintStream); | ||
log.log(Level.INFO, "\n" + bout.toString()); | ||
} | ||
|
||
@Test | ||
public void testWriteCommittedStream() throws Exception { | ||
WriteCommittedStream.writeCommittedStream( | ||
GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); | ||
} | ||
|
||
@Test | ||
public void testWriteToDefaultStream() throws Exception { | ||
WriteCommittedStream.writeToDefaultStream( | ||
GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); | ||
} | ||
|
||
@Test | ||
public void testWritePendingStream() throws Exception { | ||
WritePendingStream.writePendingStream( | ||
GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME); | ||
} | ||
} |