Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MultipartReceiver for custom, fail-fast multipart decoding #7411

Open
wants to merge 14 commits into
base: series/0.23
Choose a base branch
from

Conversation

dylemma
Copy link

@dylemma dylemma commented Mar 16, 2024

Fixes #7408

Introduces two new types; PartReceiver and MultipartReceiver. A PartReceiver is like an EntityDecoder but with its result represented as a Resource. A MultipartReceiver decides what PartReceiver to use for a part based on the part's headers. A MultipartReceiver doesn't do any explicit buffering; the part bodies are decoded as they are received. A MultipartReceiver can get wrapped up as an EntityDecoder, using the same supervisor-based semantics as mixedMultipartResource.

To solve the issues I brought up in #7408:

  • A MultipartReceiver is typically defined such that any unexpected parts will cause an immediate decoding failure, and will stop the pull. (You can call its .ignoreUnexpectedParts method to just skip past unexpected parts without consuming them or raising an error)
  • For cases where your application logic is explicitly looking for a file upload, you can use PartReceiver.toTempFile, which gets you a Path to the temp file (the underlying supervisor resource will delete the temp file when released)
  • PartReceiver has a .withSizeLimit(n) method which will cause it to raise a DecodeFailure and stop pulling after the limit is hit. This allows business logic to protect against arbitrarily-large (even infinite-sized) individual parts.
  • MultipartReceiver is applicative, so you can combine one that pulls a file Path from one part, and a String from another part.

@mergify mergify bot added series/0.23 PRs targeting 0.23.x module:core labels Mar 16, 2024
@dylemma dylemma changed the title Refactor/multipart Introduce MultipartReceiver for fail-fast multipart decoding Mar 16, 2024
Comment on lines -769 to +919
Pull.raiseError[F](MalformedMessageBodyFailure("Malformed Malformed match"))
Pull.raiseError[F](MalformedMessageBodyFailure("Malformed match"))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Malformed Malformed Malformed ... seemed like a typo to me

@dylemma dylemma changed the title Introduce MultipartReceiver for fail-fast multipart decoding Introduce MultipartReceiver for custom, fail-fast multipart decoding Mar 16, 2024
Not sure of the right syntax for linking methods in scaladoc, but just the object+name didn't cut it. Switching to `backticks` syntax to appease the build for now.
Copy link
Member

@danicheg danicheg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long feedback delay. This PR is a huge one! Due to its size, I believe many of us might feel intimidated. Therefore, please bear with us as it may take some time to review this by the team thoroughly. Likely, this is going to be an iterative process.

I've flipped through the PR, so I'll start with possible silly questions.

  1. Current MultipartDecoders are represented as functions operating with EntityDecoder, Resource and Multipart. Also, they're private implementations. In turn, PartReceiver and MultipartReceiver are defined as public. Do we need to keep them public? Introducing new entities complicates the comprehension of relatively simple concepts like Multipart.
  2. If PartReceiver is meant to be used only within the MultipartReceiver context, could we somehow combine them into one entity?

*/
private[this] def decodePartEventsSupervised[F[_], A](
supervisor: Supervisor[F],
partDecoder: Part[F] => DecodeResult[Resource[F, *], A],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DecodeResult[Resource[F, *], A] shouldn't this be inverted inside out in a sense of effect?

Copy link
Author

@dylemma dylemma Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revisit this next week when I have some more time, and it's been a little while since my head was fully around all this, but I think it was important for Resource to be the outer type. That said, it might be possible to instead use Part[F] => Resource[F, EntityDecoder[F, A]] here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if I change to partDecoder: Part[F] => Resource[DecodeResult[F, *], A] then things get complicated...


With DecodeResult[Resource[F, *], A] I can just unwrap things with .value and have a Resource[F, Either[DecodeFailure, A]] which keeps me in F for the supervisor and the side-effects.


With Resource[DecodeResult[F, *], A] the side-effects need to be lifted into EitherT and the useForever call needs to be somehow converted from a Fiber[DecodeResult, Nothing] back down to a Fiber[F, Nothing].

supervisor.supervise[Nothing] {
  partDecoder(Part(ps.value, channel.stream.unchunks)).value.attempt.evalTap { r =>
    resultPromise.complete(r.flatten) *> channel.close
  }.useForever.value.map {
    ??? // what to do with an Either[DecodeError, Nothing]
  }
}

With Resource[F, EntityDecoder[F, A]] it's a little better but the Part's body stream has to be constructed/referenced twice, which seems pitfall-y given that the body is coming from a Channel in this context:

supervisor.supervise[Nothing] {
  partDecoder(Part(ps.value, channel.stream.unchunks)).evalMap { decoder =>
   decoder.decode(Part(ps.value, channel.stream.unchunks), strict = false).value
  }.attempt.evalTap { r =>
    resultPromise.complete(r.flatten) *> channel.close
  }.useForever
}

This problem would be alleviated if I used Headers instead of Part as the argument to partDecoder, but the typical implementations want to inspect the name/filename values, which currently come from the Part class, so they'd have to copy Part's implementation logic in order to compensate.


I'm thinking the best option is to just leave this as-is.

@dylemma
Copy link
Author

dylemma commented Apr 6, 2024

I've flipped through the PR, so I'll start with possible silly questions.

  1. Current MultipartDecoders are represented as functions operating with EntityDecoder, Resource and Multipart. Also, they're private implementations. In turn, PartReceiver and MultipartReceiver are defined as public. Do we need to keep them public? Introducing new entities complicates the comprehension of relatively simple concepts like Multipart.

  2. If PartReceiver is meant to be used only within the MultipartReceiver context, could we somehow combine them into one entity?

PartReceiver and MultipartReceiver are public because my goal is to allow client code to use them to customize the decoding behavior. For example when implementing an endpoint to serve a specific form, I want to only accept file upload in a specific field, I want to decode some fields as strings, and others as numbers or lists or whatever.

I think it should be possible to replace PartReceiver[F, A] with Resource[F, EntityDecoder[F, A]] but at the time I hadn't figured out how to get that to work. I'll revisit that when I have some more time.

I think MultipartReceiver does need to be separate from EntityDecoder, in order for the Applicative to work (e.g. define receiver logic for one field, and receiver logic for another field, and combine them with e.g. .mapN)

Comment on lines +171 to +175
private def readToBuffer[F[_]: Files: MonadThrow](
input: Stream[F, Byte],
maxSizeBeforeFile: Int,
chunkSize: Int,
)(implicit c: Compiler[F, F]): Resource[F, Stream[F, Byte]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize if I am missing something, but I cannot figure this out: what it the point of creating a Resource of Stream here? The Stream type is already a scoped thing, it should not require additional scope managing. Moreover, I cannot figure out where the Resource created would be managing any resource per se: looks like it is either Resource.pure or Resource.suspend. Neither of them introduce any new scope for further management. Moreover, the latter Resource.suspend does not seem necessary at all in this context.

Copy link
Contributor

@satorg satorg Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, I see now – it is used for Files[F].tempFile. Sorry, missed it out.
Anyway, it does not seem necessary to enclose Stream into Resource, something like this can be used instead:

Stream.resource(Files[F].tempFile(...)).flatMap { path =>
  Files[F].readAll(path)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responding from my phone; apologies for any errors...

readToBuffer is my attempt to represent the existing functionality of parseToPartsSupervisedFile in terms of the new PartReceiver trait. This method is a private implementation detail of toMixedBuffer, defined a few lines up from here.

The intention here is that allocation of the resource is what causes the part body to be collected to a buffer (which is either an in-memory chunk a temp file), and the returned stream acts as a facade for that buffer while it is still valid. Reading that steam multiple times should not have to re-create the temp file, so I don't think I can Construct it the way you suggest.

maxSizeBeforeFile: Int,
chunkSize: Int,
)(implicit c: Compiler[F, F]): Resource[F, Stream[F, Byte]] = {
final case class Acc(bytes: Stream[Pure, Byte], bytesSize: Int)
Copy link
Contributor

@satorg satorg Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the way it is used in the code, it seems that the entire Acc class can be replaced with just one Chunk, which already is aware of its size therefore no additional field is required to pass it around.

Copy link
Member

@rossabaker rossabaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very nice work, and I'm sorry it took me so long to get around to it.

Comment on lines +807 to +808
val keepPulling = F.pure(true)
val stopPulling = F.pure(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like these values could be cached, but it's not a big deal.

implicit def multipartReceiverApplicative[F[_]]: Applicative[MultipartReceiver[F, *]] =
new MultipartReceiverApplicative[F]

private class MultipartReceiverApplicative[F[_]] extends Applicative[MultipartReceiver[F, *]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An instance of an unsealed type makes me a little nervous. I'm getting too sleepy to think about whether the abstract methods could affect the legality of the instance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you're saying here. Is the issue with defining the private class that extends Applicative, as opposed to just returning an anonymous class from def multipartReceiverApplicative? Or is this more related to "cats laws" legality?

Assuming it's about cats laws, I initially inquired about this in the typelevel discord's #cats channel - here. I got the impression that the "order matters" nature of decide when combining MultipartReceivers was acceptable.

Comment on lines +79 to +82
private def partName(headers: Headers) =
headers.get[`Content-Disposition`].flatMap(_.parameters.get(ci"name"))
private def partFilename(headers: Headers) =
headers.get[`Content-Disposition`].flatMap(_.parameters.get(ci"filename"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are dragons in this header with internationalized names. See #7419, #7436. Does the receiver need some nicer way of interpreting these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can just construct a Part(headers, Stream.empty) and delegate to its name and filename methods?


/** Creates a PartReceiver that ignores the part body. */
def ignore[F[_]]: PartReceiver[F, Unit] =
_ => Resource.pure(Right(()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is Resource.unit, or maybe Applicative[Resource[F, *]].unit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless that * is doing some heavy lifting, I think I'd still have to .map(Right(_)) it, since the abstract method being fulfilled here is expected to return Resource[F, Either[DecodeFailure, A]].

)(implicit
files: Files[F],
mt: MonadThrow[F],
c: Compiler[F, F],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been warned by the fs2 authors to not take Compiler as a constraint. Maybe it's just better to slap a Concurrent on F?

(filename, _) => Part.fileData(name, filename, bytes[F], headers: _*),
)
}
object PartValue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to me that there's no OfBytes. It has to go to a file to be a binary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. PartValue was intended as the representation of a decoded part from MultipartReceiver.auto, which picks between decoding the part to a String or dumping it to a File depending on the part headers. An OfBytes case would imply the auto would sometimes decide not to decode a regular part to text, and I don't know offhand what kind of edge cases might motivate that.

If there was a relevant "business logic" motiviation, then the user wouldn't use MultipartReceiver.auto; instead they'd set up a custom MultipartReceiver that decides to use PartReceiver.toMixedBuffer for arbitrary "bytes" receiving.

You bringing this up made me realize that the docs I wrote on trait PartValue are wrongfully referencing PartReceiver.toMixedBuffer. Maybe correcting that will be enough?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
module:core series/0.23 PRs targeting 0.23.x
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feedback on Multipart
4 participants