Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

403 forbidden at the very end of rechunking #86

Open
jsadler2 opened this issue Apr 26, 2021 · 17 comments
Open

403 forbidden at the very end of rechunking #86

jsadler2 opened this issue Apr 26, 2021 · 17 comments

Comments

@jsadler2
Copy link

I'm executing a rechunk plan with a dask kubernetes cluster. Everything goes swimmingly for the entire operation ... until it mysteriously stops.

I get 12,263 out of 12,279 (99.9%) of the _copy_chunk tasks done and then it just sits there:
image

After ~30 minutes I get this error:

Exception ignored in: <finalize object at 0x7fbac8535da0; dead>
Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/weakref.py", line 572, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/dask_kubernetes/core.py", line 707, in _cleanup_resources
    namespace, label_selector=format_labels(labels)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 16097, in list_namespaced_service
    return self.list_namespaced_service_with_http_info(namespace, **kwargs)  # noqa: E501
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 16222, in list_namespaced_service_with_http_info
    collection_formats=collection_formats)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
    headers=headers)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/rest.py", line 243, in GET
    query_params=query_params)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'fa6fe50f-4cd9-4ec8-adaf-8becf2533a6f', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 26 Apr 2021 21:21:50 GMT', 'Content-Length': '299'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:pangeo:daskkubernetes\" cannot list resource \"services\" in API group \"\" in the namespace \"pangeo\"","reason":"Forbidden","details":{"kind":"services"},"code":403}


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fbac5e3c3d0>
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fbac5e98c50>>, <Task finished coro=<SpecCluster._close() done, defined at /srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/deploy/spec.py:393> exception=RPCClosed("RPC Closed: while trying to call remote method 'close'")>)
Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 805, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 752, in live_comm
    raise RPCClosed("RPC Closed")
distributed.core.RPCClosed: RPC Closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/deploy/spec.py", line 407, in _close
    await self.scheduler_comm.close(close_workers=True)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
    "%s: while trying to call remote method %r" % (e, key)
distributed.core.RPCClosed: RPC Closed: while trying to call remote method 'close'

So close! When I look at the bucket, a lot of the data has been written:

zarr store size (GB)
source 682.3
intermediate 604.5
target 595.5

I can also load the target dataset and pull out data from it. Obviously not all of the data is there, though. It's also not obvious (to me anyway) which of the data doesn't make it.

@rabernat
Copy link
Member

Frustrating!

Can you give more details? What sort of storage are you using? Just post your full workflow if you can.

@jsadler2
Copy link
Author

Sure can:

import xarray as xr
import fsspec
from rechunker import rechunk

fs = fsspec.filesystem('s3', profile='ds-drb-creds', anon=False)
nldas_path = 'ds-drb-data/nldas'
nldas_full = fs.get_mapper(nldas_path)
ds_full = xr.open_zarr(nldas_full)

intermediate = fs.get_mapper('ds-drb-data/nldas_intermediate')
target = fs.get_mapper('ds-drb-data/nldas_timeseries_chunks')

target_chunksizes = {'time': 61376, "lat": 28, "lon": 29}

target_chunks = {}

# make the chunk sizes for all my variables
for v in ds_full.var():
    target_chunks[v] = target_chunksizes

rechunk_plan = rechunk(ds_full, target_chunks, max_mem='500MB', target_store=target, temp_store=intermediate)

result = rechunk_plan.execute()

Here is a gist of my notebook: https://gist.github.com/jsadler2/1a8faf1171f3164be2fafbe044d91c57

@rabernat
Copy link
Member

rabernat commented Apr 27, 2021

Thanks for the very cool and clean example! We should put this on the rechunker website 🤩

To me, these errors are dask mumbo jumbo, not really telling us much about the underlying error. We want to get deeper into the root cause for the process hanging. To me this has the faint whiff of an fsspec asyc-io issue. For those things I often ping @martindurant and he responds 🦸‍♂️ from across the internet and immediately gets to the bottom of it.

Getting logs from the workers would be the next step, if you were willing to dig deeper.

If you were getting actual errors, rather than just a zombie process, I would recommend rechunk_plan.execute(retries=5). When working with lots of binary data in S3, you have to be resilient to a bit corrupted in transit here and there.

@rabernat
Copy link
Member

It seems like a dask task timeout would help here. That idea was proposed a long time ago - dask/distributed#391 - but hasn't been implemented.

You could try executing your flow with prefect (with a prefect Dask Executor) and using a prefect task timeout to interrupt / retry long-running tasks. But what qualifies as "long running"? How do we know a process has really hung as opposed to just taking an unusually long time?

@martindurant
Copy link

I'm not getting much from that traceback either, except that dask seems to think that it's done and errors during shutdown of the cluster.

If it is s3fs, you could set S3FS_LOGGING_LEVEL=DEBUG to get a lot more output on what's going on.

I also have experimental fsspec/filesystem_spec#617 , which could be used to allow getting debug info from stalled coroutines or cancelling them (which would trigger a retry or a real exception traceback).

Finally, I have the following change (see discussion in fsspec/filesystem_spec#560) which might help.... or not. Bottom line, I still don't know what conditions lead to a deadlock. Bu it's obviously very important to get this right!

--- a/s3fs/core.py
+++ b/s3fs/core.py
@@ -381,9 +381,17 @@ class S3FileSystem(AsyncFileSystem):
     @staticmethod
     def close_session(loop, s3):
         if loop is not None and loop.is_running():
-            sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
-        else:
-            s3._endpoint.http_session._connector._close
+            try:
+                sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
+                return
+            except TimeoutError:
+                pass
+        try:
+            # close the actual socket
+            s3._client._endpoint.http_session._connector._close()
+        except AttributeError:
+            # but during shutdown, it may have gone
+            pass

     async def _get_delegated_s3pars(self, exp=3600):

@jsadler2
Copy link
Author

Okay. So I tried again. I got a slightly different error this time:

tornado.application - ERROR - Uncaught exception in write_error
Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/web.py", line 1681, in _execute
    result = self.prepare()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 502, in prepare
    raise web.HTTPError(403)
tornado.web.HTTPError: HTTP 403: Forbidden

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/web.py", line 1217, in send_error
    self.write_error(status_code, **kwargs)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 585, in write_error
    html = self.render_template('%s.html' % status_code, **ns)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 515, in render_template
    template = self.get_template(name)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 511, in get_template
    return self.settings['jinja2_env'].get_template(name)
KeyError: 'jinja2_env'

I also looked at the worker logs. I had 60 workers. 49 of the 60 had blank logs.

5 workers had logs showing a timeout error like this:

distributed.worker - ERROR - Worker stream died during communication: tcp://10.12.77.128:41871 Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 319, in connect handshake = await asyncio.wait_for(comm.read(), time_left()) File "/srv/conda/envs/pangeo/lib/python3.7/asyncio/tasks.py", line 449, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2033, in gather_dep self.rpc, deps, worker, who=self.address File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 1030, in connect **self.connection_args, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 326, in connect ) from exc OSError: Timed out during handshake while connecting to tcp://10.12.77.128:41871 after 10 s

distributed.worker - ERROR - Worker stream died during communication: tcp://10.12.77.128:41871 Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 320, in connect await asyncio.wait_for(comm.write(local_info), time_left()) File "/srv/conda/envs/pangeo/lib/python3.7/asyncio/tasks.py", line 423, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2033, in gather_dep self.rpc, deps, worker, who=self.address File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 1030, in connect **self.connection_args, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 326, in connect ) from exc OSError: Timed out during handshake while connecting to tcp://10.12.77.128:41871 after 10 s

6 workers had logs showing a timeout error like this:

distributed.worker - ERROR - 'stage-f685cd4829bf2182a4dccc5128af3c17' Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2545, in execute data[k] = self.data[k] File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/zict/buffer.py", line 80, in __getitem__ raise KeyError(key) KeyError: 'stage-f685cd4829bf2182a4dccc5128af3c17' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2549, in execute data[k] = Actor(type(self.actors[k]), self.address, k, self) KeyError: 'stage-f685cd4829bf2182a4dccc5128af3c17'

distributed.worker - ERROR - 'stage-f685cd4829bf2182a4dccc5128af3c17' Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2227, in update_who_has self.tasks[dep].who_has.update(workers) KeyError: 'stage-f685cd4829bf2182a4dccc5128af3c17'

I did this !export S3FS_LOGGING_LEVEL=DEBUG as you suggested, @martindurant. I'm not sure it made a difference though.

@martindurant
Copy link

No, nothing about s3fs there :|

I notice the reference to an Actor - are you using dask actors?

@jsadler2
Copy link
Author

Hm. I don't know what dask actors are

@jsadler2
Copy link
Author

I should also say that I ran this on a subset of the full dataset that was 1/5th of the full time dimension, and that worked without error. So it seems like a scaling issue.

@rabernat
Copy link
Member

If we are stuck in terms of debugging, another option is to ping @jrbourbeau. Figuring out dask / rechunker issues is definitely in-scope for the Pangeo / Coiled collaboration.

@jrbourbeau
Copy link
Member

@jsadler2 what versions of dask / distributed / dask-kubernetes are you using? From the notebook you posted it looks like you're using dask=2020.12.0. Could you try upgrading those packages to their latest release version and trying again?

@jsadler2
Copy link
Author

this was with

  • dask=2020.12.0
  • distributed=2020.12.0
  • dask-kubernetes=0.11.0

I'll try again with the latest release version.

@jsadler2
Copy link
Author

jsadler2 commented May 3, 2021

@jrbourbeau - I've updated to the following via conda update:

  • dask=2021.4.1
  • distributed=2021.4.1
  • dask-kubernetes=2021.3.1

But now when I try to start up my cluster, it just hangs here:
image

Do I need to change anything else along with those dask libraries or do I have to do something different to start up the cluster with the upgrade? Maybe there is something traceback/progress I can see to get an idea of why it's not working?

@jsadler2
Copy link
Author

ping @jrbourbeau

@jrbourbeau
Copy link
Member

Hmm it's not immediately clear what the issue is. Could you look at the kubernetes pod logs to see if there are any informative errors / tracebacks there?

@jrbourbeau
Copy link
Member

Just checking in here @jsadler2. Were there any k8s logs that had useful information?

@jsadler2
Copy link
Author

jsadler2 commented Jun 8, 2021

Hi, @jrbourbeau - thanks for checking back in on this. I actually don't know how to check the k8s logs. Is that something I can do from the Jupyter interface?

I realized that I didn't need the rechunked dataset after all, so this hasn't been a top priority for me (hence the slow response 😬). I can keep trying this some though for the sake of understanding what went wrong and if there is a solution.

orianac added a commit to carbonplan/cmip6-downscaling that referenced this issue Nov 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants