Skip to content

Commit

Permalink
docs(samples): Add minimal sample to show Write API in pending mode
Browse files Browse the repository at this point in the history
  • Loading branch information
VeronicaWasson committed Oct 5, 2021
1 parent b0465d1 commit a3b027f
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 0 deletions.
133 changes: 133 additions & 0 deletions samples/snippets/append_rows_pending.py
@@ -0,0 +1,133 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START bigquerystorage_append_rows_pending]
"""
This code sample demonstrates how to write records in pending mode
using the low-level generated client for Python.
"""

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2

# If you update the customer_record.proto protocol buffer definition, run:
#
# protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the customer_record_pb2.py module.
from . import customer_record_pb2


def create_row_data(row_num: int, name: str):
row = customer_record_pb2.CustomerRecord()
row.row_num = row_num
row.customer_name = name
return row.SerializeToString()


def append_rows_pending(project_id: str, dataset_id: str, table_id: str):

"""Create a write stream, write some sample data, and commit the stream."""
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()

# When creating the stream, choose the type. Use the PENDING type to wait
# until the stream is committed before it is visible. See:
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name

# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()

# The initial request must contain the stream name.
request_template.write_stream = stream_name

# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data

# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)

# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
proto_rows.serialized_rows.append(create_row_data(2, "Bob"))

# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data

response_future_1 = append_rows_stream.send(request)

# Send another batch.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(3, "Charles"))

# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data

# Offset must equal the number of rows that were previously sent.
request.offset = 2

response_future_2 = append_rows_stream.send(request)

print(response_future_1.result())
print(response_future_2.result())

# Shutdown background threads and close the streaming connection.
append_rows_stream.close()

# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client.finalize_write_stream(name=write_stream.name)

# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)

print(f"Writes to stream: '{write_stream.name}' have been committed.")

# [END bigquerystorage_append_rows_pending]
78 changes: 78 additions & 0 deletions samples/snippets/append_rows_pending_test.py
@@ -0,0 +1,78 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pathlib
import random

from google.cloud import bigquery
import pytest

from . import append_rows_pending


DIR = pathlib.Path(__file__).parent


regions = ["US", "non-US"]


@pytest.fixture(params=regions)
def sample_data_table(
request: pytest.FixtureRequest,
bigquery_client: bigquery.Client,
project_id: str,
dataset_id: str,
dataset_id_non_us: str,
) -> str:
dataset = dataset_id
if request.param != "US":
dataset = dataset_id_non_us
schema = bigquery_client.schema_from_json(str(DIR / "customer_record_schema.json"))
table_id = f"append_rows_proto2_{random.randrange(10000)}"
full_table_id = f"{project_id}.{dataset}.{table_id}"
table = bigquery.Table(full_table_id, schema=schema)
table = bigquery_client.create_table(table, exists_ok=True)
yield full_table_id
bigquery_client.delete_table(table, not_found_ok=True)


def test_append_rows_pending(
capsys: pytest.CaptureFixture,
bigquery_client: bigquery.Client,
sample_data_table: str,
):
project_id, dataset_id, table_id = sample_data_table.split(".")
append_rows_pending.append_rows_pending(
project_id=project_id, dataset_id=dataset_id, table_id=table_id
)
out, _ = capsys.readouterr()
assert "have been committed" in out

rows = bigquery_client.query(
f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"
).result()
row_items = [
# Convert to sorted tuple of items, omitting NULL values, to make
# searching for expected rows easier.
tuple(
sorted(
item for item in row.items() if item[1] is not None and item[1] != []
)
)
for row in rows
]

assert (("customer_name", "Alice"), ("row_num", 1)) in row_items
assert (("customer_name", "Bob"), ("row_num", 2)) in row_items
assert (("customer_name", "Charles"), ("row_num", 3)) in row_items
28 changes: 28 additions & 0 deletions samples/snippets/customer_record.proto
@@ -0,0 +1,28 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The BigQuery Storage API expects protocol buffer data to be encoded in the
// proto2 wire format. This allows it to disambiguate missing optional fields
// from default values without the need for wrapper types.
syntax = "proto2";

// Define a message type representing the rows in your table. The message
// cannot contain fields which are not present in the table.
message CustomerRecord {

optional string customer_name = 1;

// Use the required keyword for client-side validation of required fields.
required int64 row_num = 2;
}
76 changes: 76 additions & 0 deletions samples/snippets/customer_record_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions samples/snippets/customer_record_schema.json
@@ -0,0 +1,11 @@
[
{
"name": "customer_name",
"type": "STRING"
},
{
"name": "row_num",
"type": "INTEGER",
"mode": "REQUIRED"
}
]

0 comments on commit a3b027f

Please sign in to comment.