New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add AppendRowsStream
helper to append rows with a BigQueryWriteClient
#284
Conversation
…95450856-write-client
Here is the summary of changes. You are about to add 2 region tags.
This comment is generated by snippet-bot.
|
Uh oh. The BackgroundConsumer eats errors if I send an invalid stream name: https://github.com/googleapis/python-api-core/blob/dcb6ebd9994fddcb1729150df1675ebf8c503a73/google/api_core/bidi.py#L659-L667 It doesn't even send a "done" notification... Hmmm... |
Samples tests are timing out, stuck waiting for the futures to be done. Possible I messed something up when making the |
It's unnecessary and kept resulting in stuff getting stuck.
…to b195450856-write-client-BiDi
BigQueryWriteClient
where append_rows
returns a helper for writing rows
def append_rows( | ||
self, | ||
initial_request: types.AppendRowsRequest, | ||
# TODO: add retry argument. Blocked by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want to keep arg order stable, should you stub something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I've added a *
to force timeout
and metadata
to be keyword arguments only. This is the approach the generated clients have been taking to avoid argument order being a part of the contract.
metadata. | ||
|
||
Returns: | ||
A tuple containing a stream and a future. Use the stream to send |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This contract feels odd to me at first blush. You seem to have two different ways of appending, depending on whether it's the first append (client append_rows) or a subsequent append(send on the returned stream). I expect this to get odd with retries.
Alternatively, perhaps you just have an open_stream method or similar in the write client, which can be opened with the necessary requisite data (stream id, schema, etc), and the caller doesn't have to be responsible for dealing with the special logic for the first append?
self.__cancelled = False | ||
self._is_done = False | ||
|
||
def cancel(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancel() on an individual future kills all the futures on the same manager? This feels like something that maybe should live on a connection abstraction. Or will it be impossible for python to end up in the state where you have one stream that's draining and one that's accepting new appends?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem odd to cancel everything sent later, though that's the semantics I copied from Pub/Sub. From what I can tell, _reopen
isn't called until the old RPC has completely shut down.
Now that I look more closely, stream resumption only allows one request for the "initial request".
That'll probably have to be refactored if/when we do stream resumption so that we can re-send any requests where we didn't get a response yet.
proto_data.rows = proto_rows | ||
|
||
# Generate a protocol buffer representation of your message descriptor. You | ||
# must inlcude this information in the first request of an append_rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/inlcude/include/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems much more aligned as a connection pattern, thanks for all the work getting it into shape!
BigQueryWriteClient
where append_rows
returns a helper for writing rowsAppendRowsStream
helper to append rows with a BigQueryWriteClient
🤖 I have created a release \*beep\* \*boop\* --- ## [2.8.0](https://www.github.com/googleapis/python-bigquery-storage/compare/v2.7.0...v2.8.0) (2021-09-10) ### Features * add `BigQueryWriteClient` where `append_rows` returns a helper for writing rows ([#284](https://www.github.com/googleapis/python-bigquery-storage/issues/284)) ([2461f63](https://www.github.com/googleapis/python-bigquery-storage/commit/2461f63d37f707c2d634a95d87b8ffc3e4af3686)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Follow-up to #278
TODO:
open
sleep
calls to busy loop?Closes #285