New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add AppendRowsStream
helper to append rows with a BigQueryWriteClient
#284
Changes from 43 commits
c34e0bf
c9d6338
dbe2a21
e5b7b3f
2d927b1
19b746e
f6af466
483b0e3
a74e7ea
eba8539
40e2800
49c3c58
6d30337
cc969eb
bba6df4
dcc648d
12a82a1
1beb0b9
34e5cbd
d2491b1
daf42a8
c40b51f
8bcc36c
350628d
4086445
4c7f60e
f42d871
39cd7af
cc9a0fb
1d7f76a
b054fce
29e25ce
faaee9e
46dd60e
4563e4b
22b7b0c
e307c8f
2b26b3d
0e91a6a
a3185e9
61fa79f
6fc1c75
7bddd4f
0c5d08c
dc33c88
9b229a3
001a922
68507d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,18 @@ | |
This is the base from which all interactions with the API occur. | ||
""" | ||
|
||
from __future__ import absolute_import | ||
from typing import Optional, Sequence, Tuple | ||
|
||
import google.api_core.gapic_v1.method | ||
import google.api_core.retry | ||
|
||
from google.cloud.bigquery_storage_v1 import reader | ||
from google.cloud.bigquery_storage_v1beta2.services import big_query_read | ||
from google.cloud.bigquery_storage_v1beta2.services import ( | ||
big_query_read, | ||
big_query_write, | ||
) | ||
from google.cloud.bigquery_storage_v1beta2 import types | ||
from google.cloud.bigquery_storage_v1beta2 import writer | ||
|
||
|
||
_SCOPES = ( | ||
|
@@ -135,3 +141,58 @@ def read_rows( | |
offset, | ||
{"retry": retry, "timeout": timeout, "metadata": metadata}, | ||
) | ||
|
||
|
||
class BigQueryWriteClient(big_query_write.BigQueryWriteClient): | ||
"""BigQuery Write API. | ||
|
||
The Write API can be used to write data to BigQuery. | ||
""" | ||
|
||
def append_rows( | ||
self, | ||
initial_request: types.AppendRowsRequest, | ||
# TODO: add retry argument. Blocked by | ||
# https://github.com/googleapis/python-api-core/issues/262 | ||
timeout: Optional[float] = None, | ||
metadata: Sequence[Tuple[str, str]] = (), | ||
) -> Tuple[writer.AppendRowsStream, writer.AppendRowsFuture]: | ||
"""Append data to a given stream. | ||
|
||
If ``offset`` is specified, the ``offset`` is checked against | ||
the end of stream. The server returns ``OUT_OF_RANGE`` in | ||
``AppendRowsResponse`` if an attempt is made to append to an | ||
offset beyond the current end of the stream or | ||
``ALREADY_EXISTS`` if user provids an ``offset`` that has | ||
already been written to. User can retry with adjusted offset | ||
within the same RPC stream. If ``offset`` is not specified, | ||
append happens at the end of the stream. | ||
|
||
The response contains the offset at which the append happened. | ||
Responses are received in the same order in which requests are | ||
sent. There will be one response for each successful request. If | ||
the ``offset`` is not set in response, it means append didn't | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Offset is Int64Value in the response, and not present for default stream. Should amend this to cover that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was copy-pasted from the protos. We'll need to update the comment there: https://github.com/googleapis/googleapis/blob/ccb73479aebb76f13698907884269bb7a38b4207/google/cloud/bigquery/storage/v1beta2/storage.proto#L147 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed b/197957975 to do an edit pass through the upstream protos |
||
happen due to some errors. If one request fails, all the | ||
subsequent requests will also fail until a success request is | ||
made again. | ||
|
||
If the stream is of ``PENDING`` type, data will only be | ||
available for read operations after the stream is committed. | ||
|
||
Args: | ||
initial_request: | ||
The initial request message for `AppendRows`. Must contain the | ||
stream name and data descriptor. | ||
metadata (Sequence[Tuple[str, str]]): | ||
Strings which should be sent along with the request as | ||
metadata. | ||
|
||
Returns: | ||
A tuple containing a stream and a future. Use the stream to send | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This contract feels odd to me at first blush. You seem to have two different ways of appending, depending on whether it's the first append (client append_rows) or a subsequent append(send on the returned stream). I expect this to get odd with retries. Alternatively, perhaps you just have an open_stream method or similar in the write client, which can be opened with the necessary requisite data (stream id, schema, etc), and the caller doesn't have to be responsible for dealing with the special logic for the first append? |
||
additional requests. Close it when finished. Use the future to wait | ||
for the initial request to complete. | ||
""" | ||
gapic_client = super(BigQueryWriteClient, self) | ||
stream = writer.AppendRowsStream(gapic_client, metadata) | ||
initial_response_future = stream.open(initial_request, timeout=timeout) | ||
return stream, initial_response_future |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Copyright 2021 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
class StreamClosedError(Exception): | ||
"""Operation not supported while stream is closed.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want to keep arg order stable, should you stub something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I've added a
*
to forcetimeout
andmetadata
to be keyword arguments only. This is the approach the generated clients have been taking to avoid argument order being a part of the contract.