Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom retry and timeout options for partition reads and queries #261

Closed
sebastian-montero opened this issue Mar 10, 2021 · 3 comments · Fixed by #278
Closed

Add custom retry and timeout options for partition reads and queries #261

sebastian-montero opened this issue Mar 10, 2021 · 3 comments · Fixed by #278
Assignees
Labels
api: spanner Issues related to the googleapis/python-spanner API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@sebastian-montero
Copy link

I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the ReadFromSpanner method in apache_beam.io.gcp.experimental.spannerio. This works for most of my Spanner tables but the really large ones that are >40m rows tend to fail due to the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
  File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
    for row in read_action(element['partitions']):
  File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
    self._consume_next()
  File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
    response = six.next(self._response_iterator)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
    for item in iterator:
  File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']

From my understanding this error comes from the ReadFromSpanner operation as it's workers have timed out.

To solve this I have tried the following:

  • Changed the num_workers and disk_size_gb and added the --experiments=shuffle_mode=service flag as suggested in Google's Common Error Guidance
  • Changed the Machine Type from n1-standard-1 up to n1-standard-32
  • My latest code is attached below. I am including Transformation for simple data wrangling in the rows.
 """Set pipeline arguments."""
    options = PipelineOptions(
        region=RUNNER_REGION,
        project=RUNNER_PROJECT_ID,
        runner=RUNNER,
        temp_location=TEMP_LOCATION,
        job_name=JOB_NAME,
        service_account_email=SA_EMAIL,
        setup_file=SETUP_FILE_PATH,
        disk_size_gb=500,
        num_workers=10,
        machine_type="n1-standard-2",
        save_main_session=True)

    """Build and run the pipeline."""
        with beam.Pipeline(options=options) as p:
            (p
             | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
             | "Transform elements into dictionary" >> beam.ParDo(Transformation)
             | "Write new records to BQ" >> WriteToBigQuery(
                 BIGQUERY_TABLE,
                 schema=SCHEMA,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
                 ) 

A potential solution is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?

@product-auto-label product-auto-label bot added the api: spanner Issues related to the googleapis/python-spanner API. label Mar 10, 2021
@larkee larkee added priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. labels Mar 11, 2021
@larkee
Copy link
Contributor

larkee commented Mar 15, 2021

Thank you for filing this issue!

The Python client currently only support custom timeouts for a couple of methods. However, we have been looking to add more support for this. I did some investigating to see how difficult it would be to add this support for partitioned reads and it looks like it's much simpler than I initially thought. I can aim to add this support in the next week or so and hopefully include it in the next release.

However, the Apache Beam module would still need to be updated to use this support and to allow users to specify a timeout.

@larkee larkee changed the title 504 Deadline Exceeded when using ReadFromSpanner in Apache Beam + Dataflow Add custom retry and timeout options for partition reads and queries Mar 17, 2021
@sebastian-montero
Copy link
Author

Thank you so much @larkee. If this feature is included, I can potentially create my own Spanner read in Apache Beam. Do you know when will the next Spanner release be?

@larkee
Copy link
Contributor

larkee commented Mar 22, 2021

@sebastian-montero I am intending on doing a release later this week. The latest it will be is early next week 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: spanner Issues related to the googleapis/python-spanner API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants