Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(samples): add samples #309

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion pom.xml
Expand Up @@ -256,10 +256,20 @@
<link>https://developers.google.com/protocol-buffers/docs/reference/java/</link>
<link>https://googleapis.dev/java/google-auth-library/latest/</link>
<link>https://googleapis.dev/java/gax/latest/</link>
<link>https://googleapis.github.io/api-common-java/${google.api-common.version}/apidocs/</link>
<link>https://googleapis.github.io/api-common-java/</link>
</links>
</configuration>
</plugin>
</plugins>
</reporting>

<profiles>
<profile>
<id>include-samples</id>
<modules>
<module>samples</module>
</modules>
</profile>
</profiles>

</project>
12 changes: 12 additions & 0 deletions samples/install-without-bom/pom.xml
Expand Up @@ -33,6 +33,18 @@
</dependency>
<!-- [END bigquerystorage_install_without_bom] -->

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>0.17.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
12 changes: 12 additions & 0 deletions samples/snapshot/pom.xml
Expand Up @@ -32,6 +32,18 @@
</dependency>
<!-- {x-version-update-end} -->

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>0.17.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions samples/snippets/pom.xml
Expand Up @@ -44,6 +44,19 @@
</dependency>
<!-- [END bigquerystorage_install_with_bom] -->

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>0.17.1</version>
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
@@ -0,0 +1,187 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

// [START bigquerystorage_arrow_quickstart]

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

public class StorageArrowSample {

/*
* SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted
* from the storage API using a generic datum decoder.
*/
private static class SimpleRowReader implements AutoCloseable {

BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private final VectorSchemaRoot root;
private final VectorLoader loader;

public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
Schema schema =
MessageSerializer.deserializeSchema(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
arrowSchema.getSerializedSchema().toByteArray())));
Preconditions.checkNotNull(schema);
List<FieldVector> vectors = new ArrayList<>();
for (Field field : schema.getFields()) {
vectors.add(field.createVector(allocator));
}
root = new VectorSchemaRoot(vectors);
loader = new VectorLoader(root);
}

/**
* Sample method for processing Arrow data which only validates decoding.
*
* @param batch object returned from the ReadRowsResponse.
*/
public void processRows(ArrowRecordBatch batch) throws IOException {
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
MessageSerializer.deserializeRecordBatch(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
batch.getSerializedRecordBatch().toByteArray())),
allocator);

loader.load(deserializedBatch);
// Release buffers from batch (they are still held in the vectors in root).
deserializedBatch.close();
System.out.println(root.contentToTSVString());
// Release buffers from vectors in root.
root.clear();
}

@Override
public void close() {
root.close();
allocator.close();
}
}

public static void main(String... args) throws Exception {
// Sets your Google Cloud Platform project ID.
// String projectId = "YOUR_PROJECT_ID";
String projectId = args[0];
Integer snapshotMillis = null;
if (args.length > 1) {
snapshotMillis = Integer.parseInt(args[1]);
}

try (BigQueryReadClient client = BigQueryReadClient.create()) {
String parent = String.format("projects/%s", projectId);

// This example uses baby name data from the public datasets.
String srcTable =
String.format(
"projects/%s/datasets/%s/tables/%s",
"bigquery-public-data", "usa_names", "usa_1910_current");

// We specify the columns to be projected by adding them to the selected fields,
// and set a simple filter to restrict which rows are transmitted.
TableReadOptions options =
TableReadOptions.newBuilder()
.addSelectedFields("name")
.addSelectedFields("number")
.addSelectedFields("state")
.setRowRestriction("state = \"WA\"")
.build();

// Start specifying the read session we want created.
ReadSession.Builder sessionBuilder =
ReadSession.newBuilder()
.setTable(srcTable)
// This API can also deliver data serialized in Apache Avro format.
// This example leverages Apache Arrow.
.setDataFormat(DataFormat.ARROW)
.setReadOptions(options);

// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
if (snapshotMillis != null) {
Timestamp t =
Timestamp.newBuilder()
.setSeconds(snapshotMillis / 1000)
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
.build();
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
sessionBuilder.setTableModifiers(modifiers);
}

// Begin building the session creation request.
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(sessionBuilder)
.setMaxStreamCount(1);

ReadSession session = client.createReadSession(builder.build());
// Setup a simple reader and start a read session.
try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) {

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
reader.processRows(response.getArrowRecordBatch());
}
}
}
}
}

// [END bigquerystorage_arrow_quickstart]