Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-resumed failed downloads #2006

Merged
merged 2 commits into from
Jun 4, 2024
Merged

Auto-resumed failed downloads #2006

merged 2 commits into from
Jun 4, 2024

Conversation

micahflee
Copy link
Contributor

@micahflee micahflee commented May 14, 2024

Status

Ready for review

Description

Will fix #1994.

I've primarily edited API.download_submission. There's a big code block the only runs if self.proxy is False, and if the proxy is in use it doesn't do anything yet. I'll still need to make this work with proxy v2.

The change works like this: The output file is opened for writing at the beginning, and then there's a loop that continues until the download is complete, or it has been retried 3 times. (I have a comment to make RETRY_LIMIT configurable, but it's hard-coded right now.)

Inside this loop, it tries to download the submission by sending the correct request to self._send_json_request. I've made it so stream=True is passed to requests.request on the download requests, which means the requests.request call returns immediately, but the content of the request is later streamed via data.iter_content. After _send_json_request returns, it streams a chunk at a time, writing that chunk to disk, and keeping track of how many bytes have been written in total.

If there's a ReadTimeout, ConnectTimeout, ConnectionError, or TooManyRedirects exception during the data.iter_content call (basically, if the download starts, but then it times out during one of the chunks), then it retries and the loop starts over. This time though, when it calls self._send_json_request it sends the appropriate Range header to just get the data from where it left off.

I've written a new test to test this, TestAPI.test_download_submission_autoresume. This works by stubbing requests.request with a fake version of it that, the first time it runs, makes the request timeout after serving 200 bytes, and subsequent times serves the request as it should. So, this should serve 200 bytes of the download, timeout, and then resume with a range request and finish.

The test passes, and I've confirmed that the code actually works like this. But the problem is that the test files are too small. In API.download_submission, the chunk size is 1024 bytes, but the test file I'm downloading is 651 bytes, which means that currently the test actually serves the entire 651 bytes in a single chunk and then raises a ReadTimeout. It still works, but it would be more realistic if we had a larger file to test with.

I confirmed that it works as it's supposed to by changing the chunk size in API.download_submission from 1024 to 64 bytes, and it basically sends 4 chunks (256 bytes) and then raises a ReadTimeout on the subsequent chunk. Then, on retry, it makes a range request and successfully finishes the download.

Can we add test data to the securedrop to include a bigger file? And also a binary file, like maybe a 200kb image or something, since all the file uploads now are text?

What's left:

  • Make RETRY_LIMIT configurable
  • Support proxy v2

@legoktm
Copy link
Member

legoktm commented May 15, 2024

I'll still need to make this work with proxy v2.

In proxy v2, all requests go through the proxy, even in development mode, so there's only one code path to implement. When downloading files, we set stream: True in the request to the proxy, so it streams back the data on stdout, and if it errors, that'll go to stderr. Right now if we see an error in stderr, we bubble it up, so we'll want roughly the same loop you implemented, checking how many bytes we got and then retrying the request. See

def _send_json_request(
self,
method: str,
path_query: str,
stream: bool = False,
body: str | None = None,
headers: dict[str, str] | None = None,
timeout: int | None = None,
) -> StreamedResponse | JSONResponse:
"""Build a JSON-serialized request to pass to the proxy.
Handle the JSON or streamed response back, plus translate HTTP error statuses
to our exceptions."""
data: dict[str, Any] = {"method": method, "path_query": path_query, "stream": stream}
if method == "POST" and body:
data["body"] = body
if headers:
data["headers"] = headers
if timeout:
data["timeout"] = timeout
data_str = json.dumps(data).encode()
try:
env = {}
if self.development_mode:
env["SD_PROXY_ORIGIN"] = self.server
response = subprocess.run(
self._rpc_target(),
capture_output=True,
timeout=timeout,
input=data_str,
env=env,
check=False,
)
except subprocess.TimeoutExpired as err:
raise RequestTimeoutError from err
# error handling
if response.returncode != 0:
try:
error = json.loads(response.stderr.decode())
except json.decoder.JSONDecodeError as err:
raise BaseError("Unable to parse stderr JSON") from err
raise BaseError("Internal proxy error: " + error.get("error", "unknown error"))
# We need to peek at the content to see if we got a streaming response,
# which only happens for stream=True and non-error response
if stream and (not response.stdout or response.stdout[0] != b"{"):
try:
stderr = json.loads(response.stderr.decode())
sha256sum = stderr["headers"]["etag"]
filename = stderr["headers"]["content-disposition"]
except (json.decoder.JSONDecodeError, KeyError) as err:
raise BaseError("Unable to parse header metadata from response") from err
return StreamedResponse(
contents=response.stdout, sha256sum=sha256sum, filename=filename
)
for the SDK side and
/// Given a `Response` that does require stream processing, forward it to stdout as we receive it, and then write the headers to stderr when we're done.
async fn handle_stream_response(resp: Response) -> Result<()> {
// Get the headers, will be output later but we want to fail early if it's missing/invalid
let headers = headers_to_map(&resp)?;
let mut stdout = io::stdout().lock();
let mut stream = resp.bytes_stream();
// Stream the response, printing bytes as we receive them
while let Some(item) = stream.next().await {
stdout.write_all(&item?)?;
// TODO: can we flush at smarter intervals?
stdout.flush()?;
}
// Emit the headers as stderr
eprintln!(
"{}",
serde_json::to_string(&StreamMetadataResponse { headers })?
);
Ok(())
}
for the proxy side.

Can we add test data to the securedrop to include a bigger file? And also a binary file, like maybe a 200kb image or something, since all the file uploads now are text?

Yep, that seems like a good idea, test data is currently generated by the loaddata.py script so adding stuff would go there.

@micahflee
Copy link
Contributor Author

I've rebased from the proxy-rusting branch (#1718), so the commits are starting to look messy. I've started to refactor the API code to support streaming, but I'm hitting an issue with using the new VCRAPI.

Specifically, in order for streaming responses to actually work, I've changed the type of StreamedResponse.contents from bytes to io.BufferReader:

@dataclass(frozen=True)
class StreamedResponse:
    """Container for streamed data along with the filename and ETag checksum sent by the server."""

    contents: BufferedReader
    sha256sum: str
    filename: str
    content_length: int

And I also made it so that when _send_json_request runs the subprocess, it runs subprocess.Popen if streaming, and subprocess.run if not streaming. This way the function can return stdout without reading all of the data, allowing it to be streamed.

The problem I'm hitting with VCRAPI is ultimately:

TypeError: cannot pickle '_io.BufferedReader' object

The VCR is trying to pickle a StreamedResponse, but since it contains the stdout for a running process, it just can't be done. I'm not sure the best way to proceed.

Since I'm be reviewing #1718 soon, and that's where VCRAPI was implemented, I think I may point this out there instead and possibly request changes.

@legoktm
Copy link
Member

legoktm commented May 17, 2024

The idea is that both StreamedResponse and JSONResponse are serializable, so we can save them in the YAML files for VCR.

Do we need to stream the bytes through _send_json_request? The way I was thinking was that _send_json_request would do all the requests, including multiple retries, buffering the received bytes and then return a single StreamedResponse with all the combined bytes at the end.

@micahflee
Copy link
Contributor Author

Do we need to stream the bytes through _send_json_request? The way I was thinking was that _send_json_request would do all the requests, including multiple retries, buffering the received bytes and then return a single StreamedResponse with all the combined bytes at the end.

Ahh okay, this makes a lot of sense. I'll try this out.

@micahflee
Copy link
Contributor Author

Actually, there's an issue with _send_json_request making all of the requests and returning a StreamedResponse with bytes.

download_submission is where the file is actually streamed to disk. If we do all of the requests and retries in _send_json_request and just return the bytes, this means the download must stream into memory first and can't be streamed directly to disk. For example, a 500mb submission would take 500mb of RAM to download, when it would only need one chunk size of RAM to stream directly to disk.

Does the client every download multiple submissions simultaneously? I could see multiple 500mb submissions causing a DOS on SDW, by exhausting the memory.

It might be a good idea to talk this through and figure out exactly how it should work.

@legoktm
Copy link
Member

legoktm commented May 17, 2024

Does the client every download multiple submissions simultaneously?

I think downloads are in series, but I agree with your general point that we should buffer downloads. And we have two use cases of of wanting to both serialize the raw bytes but also keep them out of memory and on disk.

Maybe we could do something like:

  • have subprocess stream the bytes to a temporary file (like you proposed)
  • return a file handle in StreamedResponse, that callers can do .read() on. (ideally this would be some kind of TemporaryFile class that automatically deletes off of disk once the variable is dropped)
  • when StreamedResponse is serialized for VCRs, it reads from the file on disk, converts it to io.BytesIO, and serializes that? and unserializes to io.BytesIO as well.

What do you think?

@micahflee micahflee force-pushed the 1994-autoresume branch 2 times, most recently from c1ce17f to d6f2fd1 Compare May 20, 2024 20:00
@micahflee
Copy link
Contributor Author

micahflee commented May 20, 2024

My earlier rebase was extremely messy, so since proxy v2 to was merged into main, I cleaned up all of my commits (including writing detailed commit messages) and force pushed. This PR is now ready for review. Here are some things to consider:

I get DOWNLOAD_RETRY_LIMIT from QubesDB by subprocessing out to qubesdb-read, and it defaults to 3.

As we discussed above, the streaming and retrying now happens directly in _send_json_request. It streams to a temporary file on disk until the download is complete. However, after it's complete, it still loads the whole thing in memory in order to store in a StreamedResponse object:

# FIXME: For now, store the contents as bytes
fobj.seek(0)
contents = fobj.read()
fobj.close()

I didn't mess with the VCR cassette code to try to serialize file objects with .read() or anything, and it's still serializing bytes directly. Downloading a 500mb file will take 500mb of RAM still. However, if we want to change this approach in the future it should be simple.

Do you think we should change it in this PR, or punt that for the future?

This also includes two new tests:

  • test_download_submission_autoresume: This downloads a submission, but it stubs subprocess.Popen to replace stdout with a fake stdout that writes up to 200 bytes and then raises a subprocess.TimeoutExpired exception the first time. Subsequent subprocess.Popen calls no longer used the stubbed version. This simulates a proxy subprocess that times out the first time, and then finishes the download with a subsequent range request the next time, that doesn't time out.
  • test_download_submission_autoresume_fail: This stubs subprocess.Popen so that it will always time out, just to make sure that the timeout exception is still getting thrown when number of retries runs out.

@micahflee micahflee marked this pull request as ready for review May 20, 2024 20:21
@micahflee micahflee requested a review from a team as a code owner May 20, 2024 20:21
Copy link
Member

@legoktm legoktm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay! Left comments - let me know if it would be helpful to do a pairing session (sync or async) on any of the feedback.

I think it would be also good to split the retry loop into its own function so we don't end up with super indented code and a 150-line function :)

client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/tests/sdk/test_api.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved

# Check for errors
if returncode != 0:
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these errors should raise right away, it should get retried too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this got missed - these errors should retry as well. Let's do this in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, no wait, you added c3074c6. Let me debug why I saw an error a bit more...

@micahflee
Copy link
Contributor Author

@legoktm this should address everything. Here are the main changes:

  • SD_DOWNLOAD_RETRY_LIMIT is loaded directly from QubesDB instead of by calling out to qubesdb-read

  • The chunk size has been increased to 1024 bytes.

    But note that, in order for the tests to work, you now need to start the server with NUM_SOURCES=5 LOADDATA_ARGS="--random-file-size 3" make dev, since it requires tests files uploaded that are larger than 1KB, so it can be split into chunks. I've updated the readme at client/tests/sdk/README.md to include the new command to start the server, but I also noticed here that the readme is out of date -- it still has separate instructions for generating cassettes for API calls over HTTP and over qrexec, but it's all over subprocess to the proxy now. I haven't updated this, but maybe should.

  • The tests use mocker instead of monkeypatching

  • I've split the streaming code out of _send_json_request into _streaming_download. I've also split the JSONResponse handling code into _handle_json_response, since _streaming_download might need to return a JSONResponse too.

  • I made a new test to ensure that streaming downloads will return a JSONResponse on errors.

Copy link
Member

@legoktm legoktm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments, in dev testing it looks good but in Qubes/SDW I'm hitting errors with the initial sync failing - let me debug that further.

client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
client/securedrop_client/sdk/__init__.py Outdated Show resolved Hide resolved
@micahflee
Copy link
Contributor Author

Alright I think this is ready! I got the SDK tests to pass by this: 1a103a7

@legoktm
Copy link
Member

legoktm commented May 31, 2024

Yay, it looks good to me as well. I'll rebase and then do one more round of testing on Qubes before merging.

@legoktm
Copy link
Member

legoktm commented May 31, 2024

I set up a server with 50MB files, set 10 retries via qubesdb, start a large download, and then kill -9 the securedrop-proxy process in sd-proxy and it immediately fails instead of retrying. I'm trying to debug what's going wrong...

@micahflee
Copy link
Contributor Author

I figured out what was going on, and 1ee3538 fixes it now.

I added a bunch of debug logs, ran it in sd-app, tried downloading a large file over tor, and killed securedrop-proxy in the middle of the download. I discovered it was actually failing at this exception:

# We will never reach this code because we'll have already returned or raised
# an exception by now, but it's required to make the linter happy
raise RuntimeError(
"This should be unreachable, we should've already returned or raised a different exception" # noqa: E501
)

During the loop that reads chunks from the proc.stdout, it determines that the download is finished if the chunk is None:

# Write the contents to disk
chunk_size = 1024
while not download_finished:
if proc.stdout is not None:
chunk = proc.stdout.read(chunk_size)
if not chunk:
download_finished = True
break

It turns out, when the securedrop-proxy process is killed, proc.stdout.read() also returns None, even though the download isn't finished. But since it sets download_finished to True, the retry loop doesn't retry, it makes it to the end of the function, and throws the exception.

I fixed it by checking the process return code when it's done executing. If it's non-zero, I set download_finished to False, which allows the retry loop to continue until it's finished.

Also, I moved the block of code that reads the contents of the streaming file from disk to memory to below this return code check. Here's that code:

# FIXME: For now, store the contents as bytes
fobj.seek(0)
contents = fobj.read()
fobj.close()

Since fobj is a tempfile, when fobj.close() is called, it deletes the file from disk, and that breaks the download if the retry had actually failed, so it can't delete the file from disk until after we're sure the download has succeeded.

This commits adds several new debug logs, but it will only get logged if you run securedrop-client like this: LOGLEVEL=DEBUG securedrop-client

Also when testing this, I actually killed the securedrop-proxy process twice for the same download, and watched it retry twice and still finish successfully.

@legoktm
Copy link
Member

legoktm commented Jun 4, 2024

Rebased to clear the safety alert...

client/securedrop_client/config.py Outdated Show resolved Hide resolved
client/securedrop_client/config.py Show resolved Hide resolved
client/securedrop_client/config.py Show resolved Hide resolved
@legoktm legoktm force-pushed the 1994-autoresume branch 2 times, most recently from b1f8956 to 10a0951 Compare June 4, 2024 16:39
@legoktm
Copy link
Member

legoktm commented Jun 4, 2024

The PR looks good! I successfully downloaded a ~50MB file over 3 requests. On the server side, I saw it responding with HTTP 206 for Partial Content.

I pushed one commit to make the value in QubesDB optional, thanks to Cory for reviewing. I'm going to do a partial squash and then merge this (once tests pass).

@legoktm legoktm force-pushed the 1994-autoresume branch 2 times, most recently from 88acf0a to 8252312 Compare June 4, 2024 16:52
A new _streaming_download function runs a loop that retries failed downloads,
using a configurable value from QubesDB/environment (default 3). The files
are streamed to disk in a temporary file and then loaded in memory (a FIXME
exists for that) to return a StreamedResponse with the contents as bytes.

The test works by mocking subprocess.Popen. The first time it uses Popen to
download a file, it returns 200 bytes and then raises
subprocess.TimeoutExpired, and then subsequent times it uses the real,
non-stubbed Popen with range requests.

test_download_submission_autoresume_fail makes sure that in the event of a
timeout that doesn't auto-resume, a RequestTimeoutError is ultimately thrown.

We also generate bigger random files for tests using LOADDATA_ARGS to
effectively test this.

`__reduce__` methods were added to the custom exceptions because otherwise an
exception was thrown when VCR tried to deepcopy them.
@legoktm legoktm force-pushed the 1994-autoresume branch 5 times, most recently from ec045cc to 6d56025 Compare June 4, 2024 19:50
Currently QubesDB lookups are required, while environment lookups are
optional. For fields with a default value, the QubesDB lookup is no
longer required.

Ideally we'd also make the environment lookups mandatory if the field is
required (`journalist_key_fingerprint`), but this causes a number of
issues with tests and should be done separately.

We were relying on `gpg_domain` being optional in env but not in
QubesDB; it's now explicitly optional at the type level.

Fixes #2041.
Copy link
Member

@legoktm legoktm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, finally got the tests to pass. LGTM, thanks @micahflee!

@cfm if you have time to look at 0b40907 again since I changed it since the first time, then I think we can land this.

@legoktm legoktm merged commit c3af813 into main Jun 4, 2024
58 checks passed
@legoktm legoktm deleted the 1994-autoresume branch June 4, 2024 21:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Transparently resume and complete stalled or failed submission downloads.
4 participants