Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curl_httpclient: Future.cancel() closes connection #2728

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions tornado/curl_httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import threading
import time
from io import BytesIO
import asyncio

from tornado import httputil
from tornado import ioloop
Expand Down Expand Up @@ -96,9 +97,9 @@ def close(self) -> None:
self._multi = None

def fetch_impl(
self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
self, request: HTTPRequest, callback: Callable[[HTTPResponse], None], future: asyncio.Future
) -> None:
self._requests.append((request, callback, self.io_loop.time()))
self._requests.append((request, callback, future, self.io_loop.time()))
self._process_queue()
self._set_timeout(0)

Expand Down Expand Up @@ -211,6 +212,12 @@ def _finish_pending_requests(self) -> None:
self._finish(curl, errnum, errmsg)
if num_q == 0:
break
for curl in self._curls:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping over every pending request each time any request finishes makes the entire thing quadratic, which isn't good. In order to support cancellation properly we need to make it event-driven: add a callback to the future that fires when it is cancelled, instead of periodically calling cancelled() to check the status. (I think there might be more idiomatic ways to implement this for simple_httpclient which is already coroutine-based)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All great feedback, thanks! Was desperate for a solution and had just learned about co-routines hours before writing this. I noticed some checks failed. Any insight on that? I did a somewhat deep dive, but was having trouble understanding what the errors even meant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the main test failure is just:

ERROR: test_error_after_cancel (tornado.test.curl_httpclient_test.CurlHTTPClientCommonTestCase)
...
Exception: did not get expected log message

That test is pretty straightforward:

    def test_error_after_cancel(self):
        fut = self.http_client.fetch(self.get_url("/404"))
        self.assertTrue(fut.cancel())
        with ExpectLog(app_log, "Exception after Future was cancelled") as el:

The other failing bits are formatting checks from flake8 and black

if curl not in self._free_list:
future = curl.info['future']
if future and future.cancelled():
curl.info['future'] = None
curl.close()
self._process_queue()

def _process_queue(self) -> None:
Expand All @@ -219,13 +226,14 @@ def _process_queue(self) -> None:
while self._free_list and self._requests:
started += 1
curl = self._free_list.pop()
(request, callback, queue_start_time) = self._requests.popleft()
(request, callback, future, queue_start_time) = self._requests.popleft()
# TODO: Don't smuggle extra data on an attribute of the Curl object.
curl.info = { # type: ignore
"headers": httputil.HTTPHeaders(),
"buffer": BytesIO(),
"request": request,
"callback": callback,
"future": future,
"queue_start_time": queue_start_time,
"curl_start_time": time.time(),
"curl_start_ioloop_time": self.io_loop.current().time(),
Expand Down
5 changes: 3 additions & 2 deletions tornado/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import ssl
import time
import weakref
import asyncio

from tornado.concurrent import (
Future,
Expand Down Expand Up @@ -303,11 +304,11 @@ def handle_response(response: "HTTPResponse") -> None:
return
future_set_result_unless_cancelled(future, response)

self.fetch_impl(cast(HTTPRequest, request_proxy), handle_response)
self.fetch_impl(cast(HTTPRequest, request_proxy), handle_response, future)
return future

def fetch_impl(
self, request: "HTTPRequest", callback: Callable[["HTTPResponse"], None]
self, request: "HTTPRequest", callback: Callable[["HTTPResponse"], None], future: asyncio.Future
) -> None:
raise NotImplementedError()

Expand Down
3 changes: 2 additions & 1 deletion tornado/simple_httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import time
from io import BytesIO
import urllib.parse
import asyncio

from typing import Dict, Any, Callable, Optional, Type, Union
from types import TracebackType
Expand Down Expand Up @@ -163,7 +164,7 @@ def close(self) -> None:
self.tcp_client.close()

def fetch_impl(
self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
self, request: HTTPRequest, callback: Callable[[HTTPResponse], None], future: asyncio.Future
) -> None:
key = object()
self.queue.append((key, request, callback))
Expand Down