Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage to check if URLS are alive? #719

Open
jbusecke opened this issue Mar 29, 2024 · 0 comments
Open

Stage to check if URLS are alive? #719

jbusecke opened this issue Mar 29, 2024 · 0 comments

Comments

@jbusecke
Copy link
Contributor

jbusecke commented Mar 29, 2024

In my CMIP6 feedstock I am often battling issues with flaky servers. It is not uncommon that the urls I am getting by querying the API will not work even a few minutes after (or at least some of them). This is a reality of the use case for as much as I am concerned but I would like a way to handle this better with PGF:

An example can be found here(private dataflow logs - mostly for my own reference, happy to share to anyone interested).

This job takes almost 1 hour (EDIT: Its actually closer to 2hours 🤯) to fail! I think we should be able to surface this sort of failure quicker!

I am getting these sorts of errors:

fsspec.exceptions.FSTimeoutError [while running 'Creating CMIP6.CMIP.CAS.FGOALS-g3.historical.r1i1p1f1.6hrLev.hus.gn.v20190826|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr|ConsolidateDimensionCoordinates|ConsolidateMetadata|Copy|Logging to bigquery (non-QC)|TestDataset|Logging to bigquery (QC)/OpenURLWithFSSpec/MapWithConcurrencyLimit/open_url-ptransform-86']
Traceback (most recent call last):
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/implementations/http.py", line 646, in async_fetch_range
    r = await self.session.get(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/aiohttp/client.py", line 605, in _request
    await resp.start(conn)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 961, in start
    with self._timer:
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/aiohttp/helpers.py", line 735, in __exit__
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/tmp/e6f5b8aeaffc1a6f96042a294c19d19882039440233eca4e471a58b60176d08fm5qx9kuc/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
  File "/tmp/e6f5b8aeaffc1a6f96042a294c19d19882039440233eca4e471a58b60176d08fm5qx9kuc/lib/python3.10/site-packages/pangeo_forge_recipes/transforms.py", line 121, in <lambda>
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/openers.py", line 32, in open_url
    cache.cache_file(url, secrets, **kw)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py", line 209, in cache_file
    _copy_btw_filesystems(input_opener, target_opener)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py", line 38, in _copy_btw_filesystems
    data = source.read(BLOCK_SIZE)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/implementations/http.py", line 598, in read
    return super().read(length)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/spec.py", line 1846, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/caching.py", line 421, in _fetch
    self.cache = self.fetcher(start, bend)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 101, in sync
    raise FSTimeoutError from return_result
fsspec.exceptions.FSTimeoutError

which I interpret as "Fsspec was not able to cache at least one of the files and timed out"

I think that the vanilla PGF case just is not expecting urls to be flaky? And so I am wondering if there is a way to efficiently 'ping' each url on a small resource (e.g. one worker) before even starting the caching (which will often scale up workers and use considerable resources just to find out that e.g. one file was never available, #713 seems relevant here too).

I wonder if something naive like this would be possible (total pseudo code):

urls = ['http://file_a.nc', 'http://file_b.nc']

pattern = pattern_from_list(urls)

beam.Create(pattern.items())
    |ReduceAllUrlsToOneElement()
    |CheckListOfUrlsOnSingleWorker() # if one of the urls does not exist, fail here and give a nice log message!
    |MapOutUrlsAsElements() # this should recreate the exact same output as beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray()
    ...

This is not the highest priority but something that would help me a lot to keep the CMIP6 resource useage and maintenance time lower. Curious what y'all think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant