Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Mars client can't fetch large data #3122

Open
chaokunyang opened this issue Jun 7, 2022 · 0 comments
Open

[BUG] Mars client can't fetch large data #3122

chaokunyang opened this issue Jun 7, 2022 · 0 comments

Comments

@chaokunyang
Copy link
Contributor

Describe the bug
Mars client can't fetch large data, otherwise tornado will throw Content-Length too long exception

To Reproduce
To help us reproducing this bug, please provide information below:

  1. Your Python version: 3.7.9
  2. The version of Mars you use: 0.9.0
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
2022-06-07 20:39:59,643 - tornado.general - INFO - Malformed HTTP message from None: Content-Length too long
INFO:tornado.general:Malformed HTTP message from None: Content-Length too long
---------------------------------------------------------------------------
HTTPStreamClosedError                     Traceback (most recent call last)
/tmp/ipykernel_1930/3440895109.py in <module>
      2 r=mars.core.ExecutableTuple([
      3     mt.random.RandomState(0).rand(1000_0000, 5, chunk_size=100_0000),
----> 4     mt.random.RandomState(0).rand(1000_0000, 5, chunk_size=100_0000),
      5 ]).execute()._fetch()

~/miniconda3/lib/python3.7/site-packages/mars/core/entity/executable.py in _fetch(self, session, **kw)
    278         session = _get_session(self, session)
    279         self._check_session(session, "fetch")
--> 280         return fetch(*self, session=session, **kw)
    281 
    282     def _fetch_infos(self, fields=None, session=None, **kw):

~/miniconda3/lib/python3.7/site-packages/mars/deploy/oscar/session.py in fetch(tileable, session, *tileables, **kwargs)
   1907 
   1908     session = _ensure_sync(session)
-> 1909     return session.fetch(tileable, *tileables, **kwargs)
   1910 
   1911 

~/miniconda3/lib/python3.7/site-packages/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
   1686     def fetch(self, *tileables, **kwargs) -> list:
   1687         coro = _fetch(*tileables, session=self._isolated_session, **kwargs)
-> 1688         return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
   1689 
   1690     @implements(AbstractSyncSession.fetch_infos)

~/miniconda3/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

~/miniconda3/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/miniconda3/lib/python3.7/site-packages/mars/deploy/oscar/session.py in _fetch(tileable, session, *tileables, **kwargs)
   1875         tileable, tileables = tileable[0], tileable[1:]
   1876     session = _get_isolated_session(session)
-> 1877     data = await session.fetch(tileable, *tileables, **kwargs)
   1878     return data[0] if len(tileables) == 0 else data
   1879 

~/miniconda3/lib/python3.7/site-packages/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
   1132             ):
   1133                 await fetcher.append(chunk.key, meta, fetch_info.indexes)
-> 1134             fetched_data = await fetcher.get()
   1135             for fetch_info, data in zip(
   1136                 itertools.chain(*fetch_infos_list), fetched_data

~/miniconda3/lib/python3.7/site-packages/mars/services/task/execution/mars/fetcher.py in get(self)
     51             gets = self._storage_api_to_gets[storage_api]
     52             fetched_data = await storage_api.get.batch(
---> 53                 *map(operator.itemgetter(0), gets)
     54             )
     55             for get, data in zip(gets, fetched_data):

~/miniconda3/lib/python3.7/site-packages/mars/oscar/batch.py in _async_batch(self, *delays)
    151         elif self.batch_func:
    152             args_list, kwargs_list = self._gen_args_kwargs_list(delays)
--> 153             return await self.batch_func(args_list, kwargs_list)
    154         else:
    155             # this function has no batch implementation

~/miniconda3/lib/python3.7/site-packages/mars/services/storage/api/web.py in get_batch(self, args_list, kwargs_list)
    141             path=path,
    142             method="POST",
--> 143             data=serialize_serializable(get_chunks),
    144         )
    145         return deserialize_serializable(res.body)

~/miniconda3/lib/python3.7/site-packages/mars/services/web/core.py in _request_url(self, method, path, wrap_timeout_exception, **kwargs)
    233             if self.request_rewriter:
    234                 request = self.request_rewriter(request)
--> 235             res = await self._client.fetch(request, raise_error=False)
    236         except HTTPTimeoutError as ex:
    237             if wrap_timeout_exception:

~/miniconda3/lib/python3.7/site-packages/tornado/simple_httpclient.py in on_connection_close(self)
    582                 raise self.stream.error
    583             try:
--> 584                 raise HTTPStreamClosedError(message)
    585             except HTTPStreamClosedError:
    586                 self._handle_exception(*sys.exc_info())

HTTPStreamClosedError: Connection closed
  1. Minimized code to reproduce the error.
mt.random.RandomState(0).rand(2000_0000, 5, chunk_size=100_0000).execute().fetch()

Expected behavior
A clear and concise description of what you expected to happen.

Additional context
Add any other context about the problem here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant