Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple processors #39

Open
josevalim opened this issue Feb 13, 2019 · 28 comments
Open

Support multiple processors #39

josevalim opened this issue Feb 13, 2019 · 28 comments

Comments

@josevalim
Copy link
Member

josevalim commented Feb 13, 2019

Issue opened to collect feedback from the community.

@mgwidmann
Copy link

Copied from #36
@mgwidmann said:

The broadway sqs producer is very difficult to tune without this feature. If I cannot separate download concurrency from processing concurrency (by making a separate stage for downloaders and a separate stage for processors) then I cannot scale them independently.

The best workaround currently is to have the processor spawn a few tasks to do the download which I'm currently testing with Flow but that leaves the processor waiting instead of processing an already ready message (say from another downloader).

@josevalim said:

Well, since we are talking about it here, here are my $.02 cents: I am a bit skeptical though that this would make a difference. The VM is really good at multiplexing jobs, so having 2000 stages doing downloads and then processing it should totally be fine. I think sending it to another series of stages won't buy you anything, perhaps only make things slower.

@bdubaut
Copy link

bdubaut commented Aug 30, 2019

I'm in a CQRS/ES application that's consuming events from a RabbitMQ message bus as producer. What I'd like to do is to define different processors so that I can handle the messages differently. I have a web app funneling webhook events through a rabbitmq instance, that is my producer. I essentially would like to do something like this:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(MyBroadway,
      name: MyBroadwayExample,
      producers: [
        default: [
          module: {MyProducerModule, []},
          stages: 1
        ]
      ],
      processors: [
        webhook_source: [
          module: MyWebookSourceModule
          stages: 50
        ],
        other_webhook_source: [
          module: MyOtherWebookSourceModule
          stages: 50
        ]
      ]
    )
  end
end

This would allow me to route messages to the correct handlers and clean up my broadway declaration file.

I'm happy to help with this, wether it's doc or implementation.

@josevalim
Copy link
Member Author

@bdubaut in this case, you don’t need multiple processors. Processors shouldn’t be used to route business logic, for business logic, you can have code in handle_message that matches on the message and dispatches to the appropriate place.

@whatyouhide
Copy link
Collaborator

@josevalim I'm having trouble seeing how we would partition to the right set of processors. I think we can likely avoid the multiple processors, at least as far as the use cases I've use Broadway in go :)

@josevalim
Copy link
Member Author

josevalim commented Aug 31, 2019 via email

@bdubaut
Copy link

bdubaut commented Sep 1, 2019

To me multiple processors would run as steps (one after the other).

@josevalim That's a cool idea, in that case I feel like the processors key is a bit misleading, because I totally understood it as "Here are my separate processors that handle the events in different ways, and you would just need to pattern match on the event to route it to the right module."

Unless I'm the only one that understood it this way, I wonder how to make this clearer either in the doc or in the broadway options themselves 🤔

@josevalim
Copy link
Member Author

josevalim commented Sep 1, 2019 via email

@mcrumm
Copy link
Collaborator

mcrumm commented Oct 25, 2019

@josevalim I'd like to make an attempt at this, if it's still viable.

@josevalim
Copy link
Member Author

Hi @mcrumm, thanks for the ping but we are still waiting for valid use cases. Do you have any? :D

@van-mronov
Copy link

Hi @josevalim.

I would say that use case could be following:
First layer processor handler is a GenStage producer_consumer.
So, for each event from producer it generates several events for your the next processor layer, which could be a producer_consumer or a consumer.

We use such kind of GenStage pipeline to fetch data from Snapchat Marketing API (https://developers.snapchat.com/api/docs/#introduction).
So, producer fetches entities (e.g., campaigns) and pushes them as events to the next layer.
For each campaign producer_consumer from the seceond layer fetches campaign stats and pushes then as events for the next layer.
Consumer from the last layer adds some additional info from our internal DB for each stat entry and pushes it to some kafka topic or saves it DB, whatever.

Here (http://big-elephants.com/2019-01/facebook-genstage/) is the detailed description of this approach.
It is a little bit outdatted, but the idea is the same.

@josevalim
Copy link
Member Author

Thanks for sharing @van-mronov! As I mentioned in my replies above, I am still unconvinced the logic you described would really benefit from multiple steps.

For example, in what you described, I would have a single producer, N consumers, and have each of the consumers do the work from beginning to end.

So a consumer would do:

id
|> fetch_campaigns
|> for_each_campaign_get_stats
|> attach_db_info_to_stats

Without a need into break those into a bunch of processors.

The reason is because our CPU concurrency is always bounded by the machine cores so as long as you have one stage with N processors, where N is the number of cores, you will maximize CPU usage.

But then you can say "but José, they have to do I/O too", then you can easily start 100 consumers instead, and everything will still work one because the VM is really good at resources allocation and also performs work stealing.

Even if somehow you are already not happy with that, note that you have been doing 1 producer - N consumers so far, but you can also flip it to M producers - N consumers, and that already adds a lot of flexibility to split the load.

So in my mind, the goal of having multiple processors is to effectively maximize the machine resources and I haven't yet heard of a problem where you can't do that with Broadway topologies as is. In fact, I am worried adding this feature will only push people towards less optimal designs, because they will use multiple processors as logical steps, while they are everything BUT that.

@van-mronov
Copy link

Thank you @josevalim. I see what you mean. I believe we started to use several processors layers since for_each_campaign_get_stats could cause several independent http requests. So idea was to make each of them them in different processes. But, perhaps, we can do this with Task.Supervisor or Flow and Broadway. I need to think about it.

@mcrumm
Copy link
Collaborator

mcrumm commented Oct 28, 2019

Thanks for the detailed explanation -- that cleared up a number of things for me.

A use case, as it was described to me, wouldn't actually be served by simply adding more processor stages, either. However, I thought it was interesting, so to close the loop on why I resurrected this issue I'll share it here.

So right now, we receive a message in the processor and we can fan-in to a batcher. What if we could fan-out of batches, into a subsequent processor?

So, for instance, I receive messages one at a time in handle_message/3, and then I want to batch them for some bulk operation. But after the bulk operation in handle_batch/4, I need to perform some intensive operation on each message individually before acknowledging it.

Processor --> Batcher --> Processor --> Batcher

@josevalim
Copy link
Member Author

@mcrumm yes, I can see this case being a necessity. I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?

@mcrumm
Copy link
Collaborator

mcrumm commented Oct 29, 2019

I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?

I think that could be a really elegant solution!

Currently we landed on using two Broadway pipelines, which is perfectly fine, but if we could essentially compose them together without the need publish back out to a remote queue, that would be pretty fantastic.

@zachdaniel
Copy link

Yes please! :)

@matreyes
Copy link

matreyes commented Mar 2, 2020

We have all seen similar pipelines for streaming frameworks like Flink, where you have multiple stages partitioned by key for aggregating or joining messages on stream. In that case it has value because processes are distributed and handle their own big state.
For a single node case, I agree with @josevalim that could be managed with one stage for processes.

@flupke
Copy link

flupke commented Sep 23, 2021

Hello José, thanks for your wonderful work :)

Here is a use case I came across that might benefit from multiple processors:

  • [producer] get youtube URL from SQS
  • [processor] find interesting moments (highlights) in it by analyzing the audio track (ML python script)
  • [processor] use ffmpeg to create small clips around these highlights (another python script running ffmpeg)
  • [batcher] upload clips to S3

The analysis part is mostly IO and brief single core bursts so we want to have e.g. num_analysis = num_cores parallel processes.

The ffmpeg part reencodes clips to 720p, retrieving relevant parts directly from the source HTTP video; ffmpeg uses all available cores when doing this so we must limit the number of ffmpeg processes to something reasonable (e.g. num_ffmpeg = num_cores / 4).

We did not want to limit ffmpeg max threads, because we want ffmpeg to use all cores when there's only 1 encoding process in flight.

So combining analysis + ffmpeg in a single processor did not work well, we either had too much ffmpeg or not enough analysis parallel processes.

Note 1: the Erlang VM wonders don't apply here, because the CPU intensive tasks happen in external processes.

Note 2: processors' one_for_all strategy was also an issue, we don't want to kill all these long running processes when one of them fails.

So I started rewriting the pipeline with GenStage, and got a good measure of how Broadway facilitated my life before. I think multiple processors (and customisable supervisor strategies) would be a really great addition :)

@josevalim
Copy link
Member Author

Hi @flupke!

Note 2: processors' one_for_all strategy was also an issue, we don't want to kill all these long running processes when one of them fails.

This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen.

Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway).

@flupke
Copy link

flupke commented Sep 24, 2021

This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen.

I see, thank you for the clarification!

Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway).

I couldn't tell how that would work, so I did an experiment. The pool implementation seems to be as performant as the 2 stages version (maybe even better on latency, but I have doubts on my code). So this is one more point against multiple processors, and very good news for me since I can stick to Broadway.

Thank you very much!!!

@filipmnowak
Copy link

filipmnowak commented Jan 23, 2022

Hello!

(...) I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes?

@josevalim, my question is not directly related to mcrumm's case, I am just wondering - when we connect multiple pipelines, how acknowledgement semantics works?

Would it be a necessity to do something like that, for example?

RabbitMQ -> Producer -> Processor -> Batcher -> | pipeline boundary | -> RabbitMQ -> Producer -> Processor -> Batcher

Or do you see a sound way of ACKing from the next pipeline to previous pipeline Batcher? 🤔

@josevalim
Copy link
Member Author

josevalim commented Jan 23, 2022

You would disable acking on the first and then include the relevant metadata to ack on the second. If I remember correctly you can update the ack configuration on the fly.

But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases.

@filipmnowak
Copy link

filipmnowak commented Jan 23, 2022

(...) But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases.

Thank you for the answer! By no means I am entering polemics here, just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach :)
Regarding business logic, I have similar thoughts: Processors which do different classes of transformation, next dependent on result of previous, make sense to me. It's a different story, but for example similar situation we can have in distributed systems based on microservices (or services in general), where every service is doing something else, and request travels between them until completion.
Please take what I write with a bucket of salt, I don't understand Broadway well enough :)

@josevalim
Copy link
Member Author

just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach

Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines.

@filipmnowak
Copy link

just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach

Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines.

Thank you for your time, appreciated! I will try to continue with the path/approach you outlined.

@NicolayD
Copy link

NicolayD commented Mar 2, 2023

I first want to say thank you to everyone involved in Broadway, it's an amazing library! I was considering it and GenStage for a use case I have where I want to have batching, rate-limiting, and eventually back-pressure, so it seemed like a better tool than just GenStage. But I also wanted to be able to implement a topology where one event is processed in parallel (or sequentially) by several different processors where they don't have anything to do with each other. Something like:

[A - producer] Produce event -> [B - consumer] Send some requests [C - consumer] Log event [D - consumer] Push some notifications

Up until today I didn't find in the documentation any mention that Broadway can have only one processor, and the documentation begins with:

Broadway is a concurrent, multi-stage tool for building data ingestion and data processing pipelines.

But then below in the docs it says:

Note that Broadway does not accept multiple producers neither multiple processors. But we choose to keep in a list for simplicity and future proof.

I imagine I can send the events to several different pipelines, it would also allow for more fine-grained producer concurrency control. I also wonder if there's a way to subscribe custom GenStage consumers. Or perhaps use different batches with duplicate data and in a way let the different batchers be the steps B, C, and D? But I also wanted to ask - if that's the case, what is meant by multi-stage in the first sentence from the docs?

@josevalim
Copy link
Member Author

@NicolayD per the discussion above, the general observation is that breaking it into steps will often make things slower than faster. Your code should rather be:

Producer: A
Processor: B -> C -> D

Or perhaps D could happen in a batch processor.

This is a common misconception where folks want to make design steps into logical steps, but moving data around is expensive and you should only do it when necessary.

But I also wanted to ask - if that's the case, what is meant by multi-stage in the first sentence from the docs?

Each layer (producer, processor, batcher) have multiple internal stages. But even if you discount that, the skeleton of producer -> processor -> batcher -> batch processor is already multi.

@NicolayD
Copy link

NicolayD commented Mar 2, 2023

I see, thank you very much! It does make sense to be in one step.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests