Skip to content

Latest commit

 

History

History
136 lines (109 loc) · 5.39 KB

tutorial-06-parallel_wget.md

File metadata and controls

136 lines (109 loc) · 5.39 KB

A simple parallel wget: parallel_wget

Sample code

tutorial-06-parallel_wget.cc

About parallel_wget

It is our first example on parallel tasks.
The program reads multiple HTTP URLs (separated by spaces) from the command line, crawls these URLs in parallel, and prints the crawled results to the standard output according to the input order.

Creating a parallel task

In the previous example, you have already learned the SeriesWork class.

  • SeriesWork consists of a series of tasks that are executed sequentially. The series finishes when all its tasks finish.
  • ParallelWork class, corresponding to the SeriesWork, consists of multiple series that are executed in parallel. The parallel work finishes when all its series finish.
  • ParallelWork is a task.

According to the above definition, you can generate any complex workflow dynamically or statically.
The Workflow class has two interfaces for generating parallel tasks:

class Workflow
{
    ...
public:
    static ParallelWork *
    create_parallel_work(parallel_callback_t callback);

    static ParallelWork *
    create_parallel_work(SeriesWork *const all_series[], size_t n,
                         parallel_callback_t callback);

    ...
};

The first interface creates an empty parallel task, and the second interface creates parallel tasks with a series array.
Before you start the parallel work, you can use add_series() interface of the ParallelWork to add series to the parallel tasks generated by either interface.
In the sample code, we create an empty parallel task and then add the series one by one.

int main(int argc, char *argv[])
{
    ParallelWork *pwork = Workflow::create_parallel_work(callback);
    SeriesWork *series;
    WFHttpTask *task;
    HttpRequest *req;
    tutorial_series_context *ctx;
    int i;

    for (i = 1; i < argc; i++)
    {
        std::string url(argv[i]);
        ...
        task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
            [](WFHttpTask *task)
        {
            // store resp to ctx.
        });

        req = task->get_req();
        // add some headers.
        ...

        ctx = new tutorial_series_context;
        ctx->url = std::move(url);
        series = Workflow::create_series_work(task, nullptr);
        series->set_context(ctx);
        pwork->add_series(series);
    }
    ...
}

You can see that we first create an HTTP task in the code, but the HTTP task cannot be directly added to the parallel task, so we need to use it to create a series first.
Each series has its own context, which is used to save the URL and the crawled results. You can learn related methods in our previous examples.

Saving and using the crawled results

The callback of an HTTP task is a simple lambda function, which saves the crawled result in its own series context, so that it can be retrieved by the parallel task.

    task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
        [](WFHttpTask *task)
    {
        tutorial_series_context *ctx =
            (tutorial_series_context *)series_of(task)->get_context();
        ctx->state = task->get_state();
        ctx->error = task->get_error();
        ctx->resp = std::move(*task->get_resp());
    });

This is necessary, because HTTP tasks will be recycled after the callback, so we have to use std::move() to move the resp.
In the callback of parallel tasks, we can easily get the results:

void callback(const ParallelWork *pwork)
{
    tutorial_series_context *ctx;
    const void *body;
    size_t size;
    size_t i;

    for (i = 0; i < pwork->size(); i++)
    {
        ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
        printf("%s\n", ctx->url.c_str());
        if (ctx->state == WFT_STATE_SUCCESS)
        {
            ctx->resp.get_parsed_body(&body, &size);
            printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : "");
            fwrite(body, 1, size, stdout);
            printf("\n");
        }
        else
            printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);

        delete ctx;
    }
}

Here, you can see the two new interfaces of ParallelWork, size() and series_at(i), which are used to obtain the number of the series in parallel and the ith parallel series respectively.
You can use series->get_context() to get the context of the series and print out the results.The printing order must be the same as with the order you add the series into the work.
In this example, there is no other work after the parallel tasks finish.
As we said above, ParallelWork is a kind of tasks, so you can use series_of() to get its series and add a new task.
However, if the crawled results are used in the new task, you need to use std::move() to move the data to the context of the series of that parallel task.

Starting a parallel task

As a parallel task is a kind of tasks, so there is nothing special in starting a parallel task. You can call start() directly, or you can use it to build or start a series.
In this example, we start a series, wake up the main process in the callback of this series, and exit the program normally.
We can also wake up the main process in the callback of parallel tasks, and there is little difference in the program behaviors. However, it is more formal to wake up the main process in the callback of the series.