Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamingMultipartFormDataParser. #3228

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
279 changes: 279 additions & 0 deletions tornado/httputil.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,285 @@ def parse_multipart_form_data(
arguments.setdefault(name, []).append(value)


_BOUNDARY_REGEX = re.compile(r'boundary="?(?P<boundary>[^"]+)"?')
"""Regex to match the boundary option."""

_1MB = 1048576
"""Number of bytes in 1 Megabyte."""


class AbstractFileDelegate:
def start_file(self, name: str, headers: HTTPHeaders) -> Optional[Awaitable[None]]:
pass

def file_data_received(self, name: str, data: bytes) -> Optional[Awaitable[None]]:
pass

def finish_file(self, name: str) -> Optional[Awaitable[None]]:
pass


class ParserState:

PARSE_BOUNDARY_LINE = 1
"""State that parses the initial boundary."""

PARSE_FILE_HEADERS = 2
"""State that parses the 'headers' for the next file/object."""

PARSE_BODY = 3
"""State that parses some body text."""

PARSING_DONE = 4
"""State that denotes the parser is finished."""


class StreamingMultipartFormDataParser(object):
"""Basic parser that accepts data and parses it into distinct files.

This parser handles 'multipart/form-data' Content-Type uploads, which
permits multiple file uploads in a single request.
"""

@classmethod
def from_content_type_header(
cls, delegate, header
) -> "StreamingMultipartFormDataParser":
if isinstance(header, bytes):
header = header.decode("utf-8")
boundary = None
# Make sure the header is the multipart/form-data.
parts = [part.strip() for part in header.split(";")]
if parts[0].lower() != "multipart/form-data":
raise ValueError("Invalid Content-Type: {}".format(parts[0]))

# Search for 'boundary='
for part in parts:
m = _BOUNDARY_REGEX.match(part)
if m:
boundary = m.group("boundary")
return cls(delegate, boundary)
raise ValueError("Required 'boundary' option not found in header!")

def __init__(
self, delegate: AbstractFileDelegate, boundary: str, max_buffer_size=_1MB
):
"""Create a StreamingMultipartFormDataParser.

This parser (asynchronously) receives data, parses it, and invokes the
given delegate appropriately.
"""
# Be nice and decode the boundary if it is a bytes object.
if isinstance(boundary, bytes):
boundary = boundary.decode("utf-8")
# Store the delegate to write out the data.
self._delegate = delegate
self._boundary = boundary
self._max_buffer_size = max_buffer_size
self._name = None

# Variables to store the current state of the parser.
self._state = ParserState.PARSE_BOUNDARY_LINE
self._buffer = bytearray()

# Variables to hold the boundary matches.
self._boundary_next = "--{}\r\n".format(self._boundary).encode()
self._boundary_end = "--{}--\r\n".format(self._boundary).encode()
self._boundary_base = self._boundary_next[:-2]

# Variables for caching boundary matching.
self._last_idx = 0
self._boundary_idx = 0

@property
def boundary(self) -> str:
"""Return the boundary text that denotes the end of a file."""
return self._boundary

def _change_state(self, state: ParserState, name: Optional[str] = None):
"""Helper to change the state of the parser.

This also clears some variables used in different states.
"""
self._state = state
self._last_idx = 0
self._boundary_idx = 0
self._name = name

async def data_received(self, chunk: bytes) -> None:
# Process the data received, based on the current state.
#
# It is possible for 'chunk' here to be larger than the maximum buffer
# size. Initially, this is okay because we still need to process the
# chunk. However, when the buffer _remains_ this size after going
# through the rest of this call, then the input is bad since each state
# should incrementally consume data from the buffer contain its size.
if len(self._buffer) > self._max_buffer_size:
raise ValueError(
"Buffer is growing larger than: {} bytes!".format(self._buffer)
)
# Ignore incrementing the buffer when in the DONE state altogether.
if self._state != ParserState.PARSING_DONE:
self._buffer.extend(chunk)

# Iterate over and over while there is sufficient data in the buffer.
# Each loop should either consume data, or move to a state where not
# enough data is available, in which case this should exit to await
# more data.
while True:
# PARSE_BODY state --> Expecting to parse the file contents.
if self._state == ParserState.PARSE_BODY:
# Search for the boundary characters.
idx = self._buffer.find(b"-")
if idx < 0:
# No match against any boundary character. Write out the
# whole buffer.
data = self._buffer
self._buffer = bytearray()
fut = self._delegate.file_data_received(self._name, data)
if fut is not None:
await fut

# Return because the whole buffer was written out.
return

# If 'idx > 0', write the data _up to_ this boundary point,
# then proceed in the same manner as 'idx == 0'.
if idx > 0:
# Write out all of the data, _up to_ this boundary point,
# then cycle around to check whether we are at the bounary
# or not. This simplifies the logic for checking against
# the boundary cases.
data = self._buffer[:idx]
self._buffer = self._buffer[idx:]
fut = self._delegate.file_data_received(self._name, data)
if fut is not None:
await fut

# Not enough data (technically) to check against. Wait for
# more data to be certain whether the boundary was parsed.
if len(self._buffer) < len(self._boundary_next):
return

# If the buffer starts with the same contents as
# 'self._boundary_base', switch states and let that state
# handle this case more cleanly.
if self._buffer.startswith(self._boundary_next):
# Mark the current file as finished.
fut = self._delegate.finish_file(self._name)
if fut is not None:
await fut
self._change_state(ParserState.PARSE_BOUNDARY_LINE)
continue

# Check the end boundary as well. The end boundary _might_
# match if the 'self._boundary_base' matches, but the
# 'self._boundary_next' does not. Wait for more data if the
# buffer does not have enough data to be sure.
if len(self._buffer) < len(self._boundary_end):
return

if self._buffer.startswith(self._boundary_end):
fut = self._delegate.finish_file(self._name)
if fut is not None:
await fut
self._change_state(ParserState.PARSE_BOUNDARY_LINE)
continue

# No match so far, so write out the data up to the next
# boundary delimiter.
next_idx = self._buffer.find(b"-", 1)
if next_idx < 0:
data = self._buffer
self._buffer = bytearray()
else:
data = self._buffer[:next_idx]
self._buffer = self._buffer[next_idx:]
fut = self._delegate.file_data_received(self._name, data)
if fut is not None:
await fut

# Continue and run the check after this update.
continue

# PARSE_BOUNDARY_LINE state --> Expecting to parse either:
# - self._boundary_next (for the next file)
# - self._boundary_end (for the end of the request)
if self._state == ParserState.PARSE_BOUNDARY_LINE:
# Parse the first boundary chunk.
if len(self._buffer) < len(self._boundary_next):
# Not enough data, so exit.
return
# This implies we are parsing another file, so transition to
# the 'PARSE_HEADER' state. Also, continue to run through the
# loop again with the new state.
if self._buffer.startswith(self._boundary_next):
self._buffer = self._buffer[len(self._boundary_next) :]
self._change_state(ParserState.PARSE_FILE_HEADERS)
continue
# Check against 'self._boundary_end' as well. There is a slim
# chance that we are at the self._boundary_end case, but still
# do not have enough data, so handle that here.
if len(self._buffer) < len(self._boundary_end):
# Hope we get more data to confirm the boundary end case.
return
elif self._buffer.startswith(self._boundary_end):
# Done parsing. We should probably sanity-check that all
# data was consumed.
self._buffer = self._buffer[len(self._boundary_end) :]
self._change_state(ParserState.PARSING_DONE)
continue
else:
gen_log.warning("Invalid boundary parsed!")

# PARSE_HEADERS state --> Expecting to parse headers with CRLF.
if self._state == ParserState.PARSE_FILE_HEADERS:
idx = self._buffer.find(b"\r\n\r\n", self._last_idx)
# Implies no match. Update the next index to search to be:
# max(0, len(buffer) - 3)
# as an optimization to speed up future comparisons. This
# should work; if there is no match, then the buffer could
# (in the worst case) have '\r\n\r', but not the final '\n'
# so we might need to rescan the previous 3 characters, but
# not 4. (Cap at 0 in case the buffer is too small for some
# reason.)
#
# In any case, there is not enough data, so just exit.
if idx < 0:
self._last_idx = max(0, len(self._buffer) - 3)
return
# Otherwise, we have a match. Parse this into a dictionary of
# headers and pass the result to create a new file.
data = self._buffer[: idx + 4].decode("utf-8")
self._buffer = self._buffer[idx + 4 :]
headers = HTTPHeaders.parse(data)
_, plist = _parse_header(headers.get("Content-Disposition", ""))
name = plist.get("name")

# Call the delegate with the new file.
fut = self._delegate.start_file(name, headers=headers)
if fut is not None:
await fut

# Update the buffer and the state.
self._change_state(ParserState.PARSE_BODY, name=name)
continue

# PARSE_DONE state --> Expect no more data, but break the loop.
if self._state == ParserState.PARSING_DONE:
if len(self._buffer) > 0:
# WARNING: Data is left in the buffer when we should be
# finished...
gen_log.warning(
"Finished with non-empty buffer (%s bytes remaining).",
len(self._buffer),
)
self._buffer.clear()

# Even if there is data remaining, we should exit the loop.
return


def format_timestamp(
ts: Union[int, float, tuple, time.struct_time, datetime.datetime]
) -> str:
Expand Down