Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Big Query Write append_rows does not respect retry configuration #593

Open
kamilglod opened this issue Apr 26, 2023 · 14 comments
Open
Labels
api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@kamilglod
Copy link

Default configuration for append_rows() call here and here sets that by default request would be retired in case of google.api_core.exceptions.ServiceUnavailable exception with a timeout of 1 day. However I observed that in case of this server response it raises this exception without retrying the call.

Environment details

  • OS type and version: Debian 11
  • Python version: 3.11.2
  • pip version: 22.3.1
  • google-cloud-bigquery-storage version: 2.19.0

Steps to reproduce

  1. Create async iterator that is yielding new rows with some random delay
  2. Start new stream and use async iterator created in step 1 to pass it to the append_rows()
  3. Observe that occassionally this process fails without retrying, even with errors that are passed to the default retry config as a predicate

Stack trace

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 73, in wait_for_connection
    await self._call.wait_for_connection()
  File "/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py", line 483, in wait_for_connection
    await self._raise_for_status()
  File "/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
    raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "recvmsg:Connection reset by peer"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"recvmsg:Connection reset by peer", grpc_status:14, created_time:"2023-04-26T09:58:29.10477569+00:00"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/src/reporter/backends/bigquery.py", line 144, in _stream_exceptions_task
    await self.exceptions_bq_stream.stream(self.exceptions_iterator)
  File "/usr/local/lib/python3.11/site-packages/prisjakt_ingestion_utils/clients/gcp_big_query/gcp_big_query_stream.py", line 130, in stream
    stream = await self.client.append_rows(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 179, in error_remapped_callable
    await call.wait_for_connection()
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 75, in wait_for_connection
    raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.ServiceUnavailable: 503 recvmsg:Connection reset by peer
@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API. label Apr 26, 2023
@abannura
Copy link

I'm seeing a similar issue with google-cloud-bigquery-storage version: 2.22.0.

In my case I'm not using an async iterator though. Instead, I'm calling the send method of an AppendRowsStream instance.

More context:

  • I have a singleton instance of AppendRowsStream. I do this by instantiating it inside a Python module, and then importing that instance where I need it.
  • I'm running a single process, with a gevent loop, and multiple greenlets (each should see the same AppendRowsStream instance).
  • As kamilglod mentioned, this happens sporadically. My guess is that for some reason the server closes the connection "unexpectedly" (maybe restarting, or maybe closing it because of "longer" periods of no use, I don't know). Then, when the client tries to send data over the channel, it receives back a 503 recvmsg:Connection reset by peer, which for some reason is not being handled correctly.

Let me know if I can be of any help (e.g. by providing additional details).

@Linchin
Copy link
Contributor

Linchin commented Nov 1, 2023

Thank you @kamilglod and @abannura. Could you provide a minimum code snippet that can reproduce the error?

@Linchin Linchin added priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Nov 1, 2023
@kamilglod
Copy link
Author

@Linchin it's not easy to repeat as it's happening occasionally, the easiest way to reproduce it is probably by setup the simplest streaming example, run it for longer than 1 day and wait for ServiceUnavailable 503 recvmsg:Connection reset by peer or some other server related errors. From the code it should be retried, but what it's happening is error is raised immediately without any retry.

@Linchin
Copy link
Contributor

Linchin commented Nov 6, 2023

@kamilglod Could you tell me which exact class you are using? Still, a code snippet would help a lot, it reduces lots of ambiguities.

@Linchin
Copy link
Contributor

Linchin commented Nov 6, 2023

@abannura For class AppendRowsStream, I think retry and timeout are not supported at this moment, see here.

@kamilglod
Copy link
Author

@Linchin sure, I can simplify my code to something like:

import logging
from typing import AsyncIterator, Sequence

from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
from google.cloud.bigquery_storage_v1.services.big_query_write.async_client import (
    BigQueryWriteAsyncClient,
)
from google.cloud.bigquery_storage_v1.types import (
    AppendRowsRequest,
    CreateWriteStreamRequest,
    WriteStream,
)
from google.protobuf.message import Message
from google.rpc.code_pb2 import Code

logger = logging.getLogger(__name__)


# same as the default, but with more exceptions in predicate
DEFAULT_RETRY = retries.Retry(
    initial=0.1,
    maximum=60.0,
    multiplier=1.3,
    timeout=86400.0,
    predicate=retries.if_exception_type(
        core_exceptions.ServiceUnavailable,
        core_exceptions.Aborted,
        core_exceptions.InternalServerError,
        core_exceptions.BadGateway,
        core_exceptions.GatewayTimeout,
    ),
    on_error=lambda exc: logger.warning("BQ stream retriable error.", exc_info=exc),
)


async def stream(table_path: str, messages: AsyncIterator[Sequence[Message]]):
    client = BigQueryWriteAsyncClient()
    write_stream = await client.create_write_stream(
        CreateWriteStreamRequest(
            parent=table_path,
            write_stream=WriteStream(type_=WriteStream.Type.COMMITTED),
        )
    )
    stream = await client.append_rows(
        _inner_stream(messages, write_stream.name), retry=DEFAULT_RETRY
    )
    async for result in stream:
        if result.error.code != Code.OK:
            raise Exception(
                f"Unexpected result {result.error.code} {result.error.message}"
            )


async def _inner_stream(
    self, messages: AsyncIterator[Sequence[Message]], stream_name: str
) -> AsyncIterator[AppendRowsRequest]:
    ...

and when the messages stream is open for a long time we're getting errors that are not repeated. Good example of it is Aborted exception after 10 minutes of inactivity: it should be retried based on the retry configuration but it's not. It's most likely connected with the issue you linked.

@Linchin
Copy link
Contributor

Linchin commented Nov 16, 2023

I am able to reproduce the issue by intentionally making the append row request invalid:

Traceback (most recent call last):
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
    async for response in self._call:  # pragma: no branch
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
    await self._raise_for_status()
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
    raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.INVALID_ARGUMENT
	details = "Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:108.177.98.95:443 {grpc_message:"Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa", grpc_status:3, created_time:"2023-11-16T19:24:38.990469091+00:00"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/google/home/xxxxxxxxxxxx/github.com/googleapis/python-bigquery-storage/samples/snippets/append_rows_retry_committed.py", line 222, in <module>
    asyncio.run(main())
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/usr/local/google/home/xxxxxxxxxxxx/github.com/googleapis/python-bigquery-storage/samples/snippets/append_rows_retry_committed.py", line 218, in main
    await stream(write_stream_value)
  File "/usr/local/google/home/xxxxxxxxxxxx/github.com/googleapis/python-bigquery-storage/samples/snippets/append_rows_retry_committed.py", line 117, in stream
    async for item in result:
  File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter
    raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.InvalidArgument: 400 Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa [detail: "[ORIGINAL ERROR] generic::invalid_argument: Invalid stream name.; Cannot parse write_stream `projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa` from request while initializing GWS logs. [google.rpc.error_details_ext] { message: \"Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa\" }"
]

@kamilglod
Copy link
Author

kamilglod commented Nov 20, 2023

@Linchin please try to reproduce it by

  1. opening new streaming
  2. sending couple of (or 1) rows through that stream
  3. sleep for at least 10 minutes
  4. after 10 minutes you should get an exception like:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
    async for response in self._call:  # pragma: no branch
  File "/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
    await self._raise_for_status()
  File "/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
    raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.ABORTED
	details = "Closing the stream because it has been inactive for 600 seconds. Entity: projects/XXX/streams/Cic2ZTU2MjJhZi0wMDAwLTI2ODMtOTc0MS01ODI0MjlhNjE5MTQ6czk"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:64.233.164.95:443 {created_time:"2023-11-20T03:35:12.366249827+00:00", grpc_status:10, grpc_message:"Closing the stream because it has been inactive for 600 seconds. Entity: projects/XXX/streams/Cic2ZTU2MjJhZi0wMDAwLTI2ODMtOTc0MS01ODI0MjlhNjE5MTQ6czk"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/src/repositories/exceptions/bigquery.py", line 122, in _stream_exceptions_task
    async for attempt in tenacity.AsyncRetrying(
  File "/usr/local/lib/python3.11/site-packages/tenacity/_asyncio.py", line 71, in __anext__
    do = self.iter(retry_state=self._retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/app/src/repositories/exceptions/bigquery.py", line 148, in _stream_exceptions_task
    await exceptions_bq_stream.stream(self.exceptions_iterator)
  File "/usr/local/lib/python3.11/site-packages/XXX/clients/gcp_big_query/gcp_big_query_stream.py", line 137, in stream
    await self._validate_stream_result(stream)
  File "/usr/local/lib/python3.11/site-packages/XXX/clients/gcp_big_query/gcp_big_query_stream.py", line 186, in _validate_stream_result
    async for result in stream:
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter
    raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.Aborted: 409 Closing the stream because it has been inactive for 600 seconds. Entity: projects/XXX/streams/Cic2ZTU2MjJhZi0wMDAwLTI2ODMtOTc0MS01ODI0MjlhNjE5MTQ6czk

Here are some other errors that I logged in last couple of days that should be retried based on the stream DEFAULT_RETRY:

ServiceUnavailable 503 Socket closed
ServiceUnavailable 503 recvmsg:Connection reset by peer
InternalServerError 500 Received RST_STREAM with error code 2
Closing the stream because server is restarted. This is expected and client is advised to reconnect.

@Linchin
Copy link
Contributor

Linchin commented Nov 20, 2023

After some learning I think the retry is actually working as intended here. The retry we are setting up here is only for establishing the connection, rather than sending the data. Right now we don't support retries at this level, so if you really need it, you will have to implement it yourself.

There is some work going on in the core client library to support retries in streaming: googleapis/python-api-core#495. However, this will only support server streaming, not the client streaming or bidirectional streaming.

Thanks to @daniel-sanche @leahecole @shollyman for helping me understand the situation.

@kamilglod
Copy link
Author

@Linchin what do you mean by server streaming, client streaming and bidirectional streaming?

@Linchin
Copy link
Contributor

Linchin commented Nov 21, 2023

Here are the official definitions in gRPC: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc

tl;dr: server streaming is when streaming is sent from server to user (like youtube), and client streaming is the other way around. In your case we are streaming stuff to the server, so it's client streaming.

@kamilglod
Copy link
Author

@Linchin thanks, now it's clear.

Do you have any best practise how to handle this server error? I handled it by simply restarting the stream (I'm using tenacity to retry in case of an error), but the problem is that I'm probably loosing one row that should be inserted (it might be consumed from iterator by stream) and I'm not sure how I can check if it was inserted into the table or not. Server might fail before or after handling row and it's not idempotent to send the same message to the different stream.

@tswast
Copy link
Contributor

tswast commented Nov 28, 2023

The read client handles reconnection to the server in the following way:

except _STREAM_RESUMPTION_EXCEPTIONS:

I don't think we implemented anything similar for the write API yet.

Edit: Potentially, we might want to add "reopen" or something like that to our send method, since we can't do a plain loop in the way that we did for reads.

The complication with the write API is that we might have a bunch of requests that we're waiting for a response on. So we might need to resend that backlog queue of requests.

@tswast
Copy link
Contributor

tswast commented Nov 28, 2023

Server might fail before or after handling row and it's not idempotent to send the same message to the different stream.

Perhaps @yirutang can comment on this? I see that https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java#L707 always resends any in-flight requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

4 participants