Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSError: [Errno 22] Invalid argument #222

Open
FlorisCalkoen opened this issue Nov 5, 2022 · 0 comments
Open

OSError: [Errno 22] Invalid argument #222

FlorisCalkoen opened this issue Nov 5, 2022 · 0 comments

Comments

@FlorisCalkoen
Copy link

FlorisCalkoen commented Nov 5, 2022

Lately I've been running into his error a few times: OSError: [Errno 22] Invalid argument .

It happens when processing dask_geopandas.GeoDataFrame while having defined a dask.distributed.Client in advance. It doesn't happen when no client is defined, i.e, dask_geopandas.read_file(fp, npartitions=30) straight away, without defining a client. Also, it seems that this error doesn't happen when the cluster is defined as Client(processes=False).

So it seems to be the combination of a dask.distributed.Client with dask_geopandas computations on dataframes that hold geometry data.

It's a bit tricky to provide a reproducable example, as this typically happens with larger datasets. But I think/hope that by describing the problem someone will be able to guide me in the right direction. If you want to reproduce this error, please find a link to the data here.

Commands to download the data ~2GB

mkdir -p ~/tmp
cd ~/tmp
wget -O sayre_coastal_segments.mpk https://rmgsc.cr.usgs.gov/outgoing/ecosystems/Global/USGSEsriGlobalCoastalSegmentsv1.mpk
unar sayre_coastal_segments.mpk
ogr2ogr -f gpkg sayre_coastal_segments.gpkg sayre_coastal_segments/v108/ecujillgeographic.gdb
import pathlib
import dask_geopandas
from dask.distributed import Client

client = Client(threads_per_worker=1, local_directory="/tmp")
print(client)  # <Client: 'tcp://127.0.0.1:54111' processes=10 threads=10, memory=64.00 GiB>

fp = pathlib.Path.home().joinpath("tmp", "sayre_coastal_segments.gpkg")
ddf = dask_geopandas.read_file(fp, npartitions=30)
df = ddf.compute()  # OSError: [Errno 22] Invalid argument 
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
Cell In [5], line 1
----> 1 df = ddf.compute()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:3052, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3050         should_rejoin = False
   3051 try:
-> 3052     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3053 finally:
   3054     for f in futures.values():

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:2226, in Client.gather(self, futures, errors, direct, asynchronous)
   2224 else:
   2225     local_worker = None
-> 2226 return self.sync(
   2227     self._gather,
   2228     futures,
   2229     errors=errors,
   2230     direct=direct,
   2231     local_worker=local_worker,
   2232     asynchronous=asynchronous,
   2233 )

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336     return future
    337 else:
--> 338     return sync(
    339         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340     )

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils.py:405, in sync(loop, func, callback_timeout, *args, **kwargs)
    403 if error:
    404     typ, exc, tb = error
--> 405     raise exc.with_traceback(tb)
    406 else:
    407     return result

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils.py:378, in sync.<locals>.f()
    376         future = asyncio.wait_for(future, callback_timeout)
    377     future = asyncio.ensure_future(future)
--> 378     result = yield future
    379 except Exception:
    380     error = sys.exc_info()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:2118, in Client._gather(self, futures, errors, direct, local_worker)
   2116     else:
   2117         self._gather_future = future
-> 2118     response = await future
   2120 if response["status"] == "error":
   2121     log = logger.warning if errors == "raise" else logger.debug

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:2169, in Client._gather_remote(self, direct, local_worker)
   2166                 response["data"].update(data2)
   2168     else:  # ask scheduler to gather data for us
-> 2169         response = await retry_operation(self.scheduler.gather, keys=keys)
   2171 return response

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils_comm.py:383, in retry_operation(coro, operation, *args, **kwargs)
    377 retry_delay_min = parse_timedelta(
    378     dask.config.get("distributed.comm.retry.delay.min"), default="s"
    379 )
    380 retry_delay_max = parse_timedelta(
    381     dask.config.get("distributed.comm.retry.delay.max"), default="s"
    382 )
--> 383 return await retry(
    384     partial(coro, *args, **kwargs),
    385     count=retry_count,
    386     delay_min=retry_delay_min,
    387     delay_max=retry_delay_max,
    388     operation=operation,
    389 )

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils_comm.py:368, in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    366             delay *= 1 + random.random() * jitter_fraction
    367         await asyncio.sleep(delay)
--> 368 return await coro()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/core.py:1154, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
   1152 prev_name, comm.name = comm.name, "ConnectionPool." + key
   1153 try:
-> 1154     return await send_recv(comm=comm, op=key, **kwargs)
   1155 finally:
   1156     self.pool.reuse(self.addr, comm)

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/core.py:919, in send_recv(comm, reply, serializers, deserializers, **kwargs)
    917 await comm.write(msg, serializers=serializers, on_error="raise")
    918 if reply:
--> 919     response = await comm.read(deserializers=deserializers)
    920 else:
    921     response = None

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/comm/tcp.py:235, in TCP.read(self, deserializers)
    233         chunk = frames[i:j]
    234         chunk_nbytes = chunk.nbytes
--> 235         n = await stream.read_into(chunk)
    236         assert n == chunk_nbytes, (n, chunk_nbytes)
    237 except StreamClosedError as e:

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:475, in BaseIOStream.read_into(self, buf, partial)
    472 self._read_partial = partial
    474 try:
--> 475     self._try_inline_read()
    476 except:
    477     future.add_done_callback(lambda f: f.exception())

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:842, in BaseIOStream._try_inline_read(self)
    840     return
    841 self._check_closed()
--> 842 pos = self._read_to_buffer_loop()
    843 if pos is not None:
    844     self._read_from_buffer(pos)

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:755, in BaseIOStream._read_to_buffer_loop(self)
    748 next_find_pos = 0
    749 while not self.closed():
    750     # Read from the socket until we get EWOULDBLOCK or equivalent.
    751     # SSL sockets do some internal buffering, and if the data is
    752     # sitting in the SSL object's buffer select() and friends
    753     # can't see it; the only way to find out if it's there is to
    754     # try to read it.
--> 755     if self._read_to_buffer() == 0:
    756         break
    758     # If we've read all the bytes we can use, break out of
    759     # this loop.
    760 
    761     # If we've reached target_bytes, we know we're done.

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:867, in BaseIOStream._read_to_buffer(self)
    865     else:
    866         buf = bytearray(self.read_chunk_size)
--> 867     bytes_read = self.read_from_fd(buf)
    868 except (socket.error, IOError, OSError) as e:
    869     # ssl.SSLError is a subclass of socket.error
    870     if self._is_connreset(e):
    871         # Treat ECONNRESET as a connection close rather than
    872         # an error to minimize log spam  (the exception will
    873         # be available on self.error for apps that care).

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:1140, in IOStream.read_from_fd(***failed resolving arguments***)
   1138 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
   1139     try:
-> 1140         return self.socket.recv_into(buf, len(buf))
   1141     except BlockingIOError:
   1142         return None

OSError: [Errno 22] Invalid argument
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant