Skip to content

Stage and Source Scheduler and Grouped Execution

Wenlei Xie edited this page Jul 18, 2020 · 9 revisions

Background

When Presto executes a query, it does so by breaking up the execution into a hierarchy of stages. Stage models a particular part of a distributed query plan. Each stage reads from data source and writes to an output buffer, and there are two types of sources:

  • Table scan source (sometimes referred to as partitioned source in code)
  • Remote source (sometimes referred to as unpartitioned source in code)

The following figure shows part of a query plan with two stages:

Two Stages

See more details about stage at https://prestodb.github.io/docs/current/overview/concepts.html#stage

Stage Scheduler

StageScheduler is responsible for the following jobs:

  • Create tasks for this stage
  • Schedule table splits to tasks

This section will discuss about stage scheduler before grouped execution is introduced. ScaledWriterScheduler is not discussed here.

The StageScheduler interface is quite concise (https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/StageScheduler.java):

public interface StageScheduler
        extends Closeable
{
    /**
     * Schedules as much work as possible without blocking.
     * The schedule results is a hint to the query scheduler if and
     * when the stage scheduler should be invoked again.  It is
     * important to note that this is only a hint and the query
     * scheduler may call the schedule method at any time.
     */
    ScheduleResult schedule();

    @Override
    default void close() {}
}

ScheduleResult contains the following information (see https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ScheduleResult.java#L36-L40):

Set<RemoteTask> newTasks;
ListenableFuture<?> blocked;
Optional<BlockedReason> blockedReason;
boolean finished;
int splitsScheduled;

Prior to grouped execution (introduced in https://github.com/prestodb/presto/pull/8951), there are two block reasons:

In this section, we are going to introduce the three main stage schedulers prior to grouped execution (https://github.com/prestodb/presto/pull/8951).

FixedCountScheduler

This is used when stage doesn't has node partitioning, and all the split sources are remote. A typical case is performing join or aggregate over unbucketed table. This is the simplest stage scheduler, all tasks are created in one time and there will be no further split scheduling (since all splits are RemoteSplit): https://github.com/prestodb/presto/blob/b430a562679aab0a04df37b7a1e77cfd5d941c81/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedCountScheduler.java#L55-L65

FixedSourcePartitionedScheduler

This is used when stage has node partitioning, and at least one split source is from table scan. This usually happens to stages reading from bucketed tables.

For each table scan split source, an SourcePartitionedScheduler is created to help schedule the splits. We denote these SourcePartitionedScheduler as s_1, s_2, …, s_k and the FixedSourcePartitionedScheduler as f. Note these s_i share the same SqlStageExecution with f -- so each s_i is not a “independent” stage scheduler, but somewhat used as a “source scheduler” for f.

s_i.schedule() are called in order (from builder to probe side). s_{i+1}.schedule() will be called only after s_i finished schedule (but not execution!), as demonstrated by the following simplified code: (the actual code can be found at https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L90-L105) :

while (!sourceSchedulers.isEmpty()) {
    ScheduleResult schedule = sourceSchedulers.peek().schedule();
    if (schedule.isDone()) {
        sourceSchedulers.remove();
    }
    else {
        break;
    }
}

All the tasks are eagerly created when first time schedule() is called. https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L78-L85

An interesting note is both the following cases will use FixedSourcePartitionedScheduler:

  • Colocated join (i.e. join two bucketed tables).
  • Bucketed table join unbucketed table. In this case, remote exchange will be added to the unbucketed table side to adopt the same bucketing.

SourcePartitionedScheduler

This is used when the stage partitioning is SOURCE_DISTRIBUTION. In other words:

  • It doesn’t have node partitioning.
  • It doesn’t contain partitioned remote source (replicated remote source is fine). Usually, this is used for the leaf stages contains table scan over unbucketed table.

The main workflow for SourcePartitionedScheduler#schedule (https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SourcePartitionedScheduler.java#L84-L147) is as follows:

  • Get the splits to be scheduled (stored in pendingSplits). splitSource#getNextBatch will be called if necessary.
  • Decide the node assignments of these splits (by calling splitPlacementPolicy#computeAssignments). There are two different split placement policy:
    • DynamicSplitPlacementPolicy: splits are from unbucketed table. It will be assigned based on worker load and locality.
    • FixedSplitPlacementPolicy (now renamed to BucketedSplitPlacementPolicy): splits are from bucketed table. It will be assigned based on NodePartitioningMap. Note for this case, SourcePartitionedScheduler is actually used as a “source scheduler” of a FixedSourcePartitionedScheduler -- since the when SourcePartitionedScheduler is used as a “stage scheduler”, the stage partitioning has to be SOURCE_DISTRIBUTION thus the input table cannot be bucketed.
  • Create new tasks if necessary, etc.
    • Note this only makes a difference when SourcePartitionedScheduler is used as a stage scheduler rather than source scheduler.
    • When SourcePartitionedScheduler is used as a source scheduler, newTasks returned in ScheduleResult will simply be ignored.

Discussion

Even before grouped execution is introduced, there are some hacks introduced, especially FixedSourcePartitionedScheduler can use SourcePartitionedScheduler as a “source scheduler”, and some of the code in SourcePartitionedScheduler are only useful as a stage scheduler, which is confusing.

Some latent bugs exist back to the old days, and only get fixed after amplified by grouped execution, such as https://github.com/prestodb/presto/issues/11253

Examples

The following figures show some concrete examples about what kind of stage schedulers are used. The logic to decide stage scheduler can be found in: https://github.com/prestodb/presto/blob/4e55aadca50e2b6f904fcfa444540e90c02e3383/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java#L255-L292

Example 1

Example 2

Grouped Execution

Grouped execution was introduced in https://github.com/prestodb/presto/pull/8951 to support huge join and aggregation raised in ETL pipelines. After this initial pull request, there are several fixes to make it production ready.

After stabilization, grouped execution consists of 3 components on coordinator:

  • LifespanScheduler: Responsible for initiated new lifespans after current lifespan finish execution (by calling SourceScheduler#startLifespan)
  • FixedSourcePartitionedScheduler (we will use StageScheduler to refer to it): Responsible for coordinating SourceSchedulers in this stage.
  • SourcePartitionedScheduler (we will use SourceScheduler to refer to it): Responsible for schedule splits of a given table source.

The boundary and responsibility of these 3 components is not very clear in some cases, and some major refactor/redesign will be required for lifespan schedule before it’s easy to understand. But it works for now.

  1. In StageScheduler constructor, the initial lifespans are scheduled: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L144-L145 . The SourceScheduler of the first source will start to schedule the splits for these buckets (a more formal name would be partitions).

    As a concrete example, assume there are two SourceScheduler in the stage, responsible for tableA and tableB, and lifespan 17 get scheduled here. The first SourceScheduler in the stage will be asked to schedule splits for bucket 42 of tableA.

  2. Once a SourceScheduler finished schedule some buckets (collected by calling SourceScheduler#drainCompletelyScheduledLifespans: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L225), StageScheduler will ask the next SourceScheduler to schedule these buckets: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L210-L212 . In other words, StageScheduler will “push” scheduling buckets for scheduled lifespans.

    In the above example, once the first SourceScheduler finish schedule splits for bucket 42 of tableA, the StageScheduler will ask the second SourceScheduler to start schedule splits for bucket 42 of tableB.

  3. Once a lifespan finished execution, LifespanScheduler#onLifespanExecutionFinished will be invoked (registered in StageScheduler constructor: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L146-L147) .

    This onLifespanExecutionFinished method will trigger the newDriverGroupReady future: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/group/FixedLifespanScheduler.java#L118 . This causes thread waiting on this future to continue. Usually it means StageScheduler#schedule will be called since it knows it has some new work to do.

    As a result, LifespanScheduler#schedule will be called in StageScheduler#schedule , this means new lifespans will be scheduled: https://github.com/prestodb/presto/blob/548fb438ad135776399deec80a1005e1c63c51dd/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java#L195-L200. And again, the first SourceScheduler will start schedule splits for these buckets.

    Following the example, some time after all splits of bucket 42 on both tableA and tableB are scheduled, the worker will report back bucket 42 finish execution. onLifespanExecutionFinished will be invoked, record lifepsan 42 finished execution, and set the newDriverGroupReady future trigger StageScheduler#schedule to be called. This result in LifespanScheduler#schedule being called, and a new lifespan, say 46, will be scheduled. And lifespan 46 will repeat the same journey as from step 1.

Clone this wiki locally