Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Composability with async frameworks and other threading solutions. #132

Open
mratsim opened this issue May 10, 2020 · 2 comments
Open
Labels
enhancement :shipit: New feature or request help wanted 👥 Extra attention is needed question ❓ Further information is requested

Comments

@mratsim
Copy link
Owner

mratsim commented May 10, 2020

This RFC supercedes:

Context

Weave is currently well-tuned to handle CPU-bound tasks:

  • Numerical & scientific computing computing
  • Image-processing, for example raytracing
  • Games: decision trees, Monte-Carlo Tree Search, Path finding, Rendering, Animation
  • Cryptography
  • Search
  • Graph-based algorithm
  • ...

However it needs better interoperability with:

  • dedicated threads
  • Async runtimes that handle IO

Current limitations

1. Submitting jobs from dedicated services

Weave requires a task to be spawned on one of the worker thread including the root thread (the one which called init(Weave)). This is problematic because applications and libraries may want to use dedicated threads and submit tasks from them to Weave, as a common threadpool that handles all the workload.

From Intel Presentation of parallelizing the Conqueror's blade game: https://www.slideshare.net/IntelSoftware/01conquerorsbladegdc100pctaftertmnb

image

We can see the following dedicated threads:

  • Render (that also probably manages GPU tasks
  • Simulation, Animation Physics
  • Worker Threads/Threadpool
    • Tasks from Simulation
    • Tasks from AI
  • Logic/Scripting AI

This would also allow Weave to collaborate with other runtimes, from async, other threadpools or even when called from another language or an external library with it's own threadpool.

2. Latency, Fairness and Priority.

Weave tasks are processed in a LIFO order. This optimizes for throughput as the latest enqueued task is:

  • still hot in cache.
  • probably enqueued by the task we were just running and might even be a child task we are awaiting.

However, this is not fair, assuming we are in a server context, we might submit multiple cryptographic tasks if the worker comes back from business and pops the last requests, the very first client will wait much longer than the last to come.
Similarly, in a game engine context, jobs will be submitted frame by frame and we want the first frame jobs to be dealt with first.

Secondly, a soft real-time system might want to submit task with various job priorities.

Proposed solution

On the dedicated thread side

We introduce a hidden lightweight JobEmitter context variable and a public setupDedicatedThread(Weave) procedure and to initialize it. They are used on threads created by createThread, raw thread, pthread or library provided threading solutions.

type
  JobEmitter = ptr object
    mempool: TLPoolAllocator # Thread-safe pool allocator, thread-local instance
    rng: RngState # Random number generator

var emitterCtx {.threadvar.}: JobEmitter

An Emitter interact with Weave and its worker via submit(myJob(a, b, c) for normal tasks and submitDelayed(pledge, (a, b, c) for continuations/callback/dataflow parallelism. This will select a worker from Weave at random (via the RNG) and push it the task.
This mirrors spawn and spawnDelayed. Result is still returned via a Flowvar, and a while not myCompute.isReady(): poll(); doSomething() can be used to non-blockingly compose with the emitter work.

The memory pool is used to allocate the Flowvar (Future/ResultChannel) to await the result.
If in an async context, isReady can be used to check if this would block.

Similar to how Asyncdispatch and Chronos distinguishes await (in an async proc) and waitFor(in the main thread), we might want a separate name from sync (in Weave threadpool), maybe reuse waitFor?
Note: we don't need to do loadBalance() in a loop, as it's not used on a thread participating in Weave runtime. However, assuming we agree that all async libraries implement a poll() function we can do (when declared(poll): poll())

A teardownDedicatedThread(Weave) will also be provided to deallocate the mempool (which should be empty if all tasks were awaited).

The rngState can be seeded with the thread PID.

On Weave side

We add a new field to the global context which holds the communication channels:

type
ComChannels* = object
## Communication channels
## This is a global objects and so must be stored
## at a global place.
# - Nim seq uses thread-local heaps
# and are not recommended.
# - ptr UncheckedArray would work and would
# be useful if Channels are packed in the same
# heap (with padding to avoid cache conflicts)
# - A global unitialized array
# so they are stored on the BSS segment
# (no storage used just size + fixed memory offset)
# would work but then it requires a pointer indirection
# per channel and a known max number of workers
# Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker"
thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]]
tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]
when static(WV_Backoff):
parking*: ptr UncheckedArray[EventNotifier]
GlobalContext* = object
com*: ComChannels
threadpool*: ptr UncheckedArray[Thread[WorkerID]]
numWorkers*: int32
barrier*: SyncBarrier
mempools*: ptr UncheckedArray[TlPoolAllocator]

type
  ComChannels* = object
    ## Communication channels
    ## This is a global objects and so must be stored
    ## at a global place.
    # - Nim seq uses thread-local heaps
    #   and are not recommended.
    # - ptr UncheckedArray would work and would
    #   be useful if Channels are packed in the same
    #   heap (with padding to avoid cache conflicts)
    # - A global unitialized array
    #   so they are stored on the BSS segment
    #   (no storage used just size + fixed memory offset)
    #   would work but then it requires a pointer indirection
    #   per channel and a known max number of workers

    # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker"
    thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]]
    tasksInFlight*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]
    tasksSubmitted*: ptr UncheckedArray[ChannelMpscUnboundedBatch[Task
    when static(WV_Backoff):
      parking*: ptr UncheckedArray[EventNotifier]

The previous tasks is renamed tasksInFlight and the new field is a Multi-Producer Single Consumer queue called tasksSubmitted.

The previous event loop of workers was https://github.com/mratsim/weave/blob/46cf3232d6b05e225dce81f4d92facf85cfd6293/weave/state_machines/event_loop.png

image

Zoom on "OutOfTask" state.

# -------------------------------------------
# 2. Run out-of-task, become a thief
onEntry(workerEventLoop, WEL_OutOfTasks):
# task = nil
let stoleTask = task.recvElseSteal(isOutOfTasks = true)
implEvent(workerEventLoop, EV_StoleTask):
stoleTask
behavior(workerEventLoop):
steady: WEL_OutOfTasks
transition:
ascertain: myWorker().deque.isEmpty()
ascertain: myThefts().outstanding > 0
declineAll()
behavior(workerEventLoop):
ini: WEL_OutOfTasks
event: EV_StoleTask
transition: profile_stop(idle)
fin: WEL_SuccessfulTheft

Like before, workers first check their local task queue, if they ran out of tasks, they try to steal new tasks. The theft is done via message-passing, and a worker detects that there is no task and should go to sleep by receiving back its own steal request.
In the proposal, instead of immediately going to sleep a worker would pop the first task from its taskSubmitted queue and run it.

Analysis of the solution

We will use the following terminology:

  • We submit jobs (dedicated thread)
  • We spawns tasks (on Weave threadpool)

This clearly solves (1): submitting jobs to Weave from a "foreign" thread.

Interestingly this also solves (2):

  • Assume 3 jobs are submitted and each spawns 10 CPU-heavy tasks.
  • The first job enters the threadpool and gets distributed by Weave load-balancing.
  • If all workers are busy, the 2 other ones stay in the submitted queue which is processed in a FIFO order.
  • Only when the previous jobs and its descendant tasks are done can the next job from the submitted queue enter scheduling.

This means that:

  • by dealing with the submitted queue in a FIFO manner, we minimize latency of the submitted jobs.
  • Priority can be handled by the caller, we don't need to bikeshed in Weave whether priority 0 be highest priority or priority 10 be the highest priority, Weave only cares about the order of submission.
  • The caller can handle fairness as well with its own algorithm. For example an async runtime might implement some kind of budget/timeslice for fair usage of CPU resources and would submit a job and then sync (await) it and at next async scheduling opportunity it would be switched out by the async runtime if over budget and replaced by other async requests. (See linux Completely Fair Queueing Scheduler)
  • We also maximize throughput per job

Limitations

  • The first limitation is by design, Weave workers communicate by messages, so they only have an approximate representation of the runtime state. The workers detect that the runtime is out of tasks by receiving back their own steal request so it's possible for 2 workers to independently decide to process their submitted tasks and so have 2 jobs enter the runtime instead of having the early job split if necessary, and the second job enter scheduling if the first was not enough to fill the CPU with work.

  • The second limitation is similar, when workers receive a steal request and have no "inner" task to share, they have no way to know if other workers might still be working on an earlier job/task or if the thief has jobs waiting to enter the runtime. If we want to have a greedy scheduler (as long as there are tasks, all workers are working) we need to have pending jobs enter scheduling.

  • The third limitation is fairness, since tasks are distributed randomly, there is no guarantee that the first task might be taken by the first available worker. However assuming we have 20 jobs submitted on a 4 cores CPU, the first tasks scheduled would still be among the first 4. This should be an OK tradeoff.

Alternative

Instead of randomly pushing tasks to one of the worker, we could have a global task queue for submitted tasks but:

  • This does not fit with Weave design of message passing and "requesting work".
    • The alternative would be to have this global queue wait for worker messages before sending them work but this would introduce latency: the messages back-and-forth and if the global queue is not on a separate thread latency for that thread to be available
  • The global task queue would have to be an unbounded Multi-Producer Consumer-Queue which are much slower and harder to optimize.
  • The way to best use multiple cores is to share and synchronize nothing. Unfortunately that global task queue would become a single point of contention and a high source of synchronization overhead. (Largely explored in Randomized Work Stealing for Large Scale Soft Real-time Systems)

Note on blocking tasks

In some cases, we might have to make a blocking call. As with an async IO runtime, a CPU-bound runtime like Weave does not like blocking calls. In both cases it removes a thread from scheduling and will halt handling tasks/futures/events.
The way to handle that is to createThread and pass the blocking call to it.

However, for a call like readline, this means manually handling channel creation to get the result. And also if repeatedly called in an interactive CLI application, lots of thread creation and destruction and so kernel work and so context switching and so slowness/blocking of the creating thread.

Weave could create also a blocking threadpool, initially with no thread. With a dedicated spawnBlocking or submitBlocking, blocking tasks can be created on this threadpool, it would be much simpler, no load balancing, no message-passing (blocking tasks are not parallelized through a cluster). To ensure proper "garbage collection" for long running application, if at one point there was a burst of blocking threads, and then none, they could be made to sleep on a futex or a condition variable with a timed wait. On wakeup, if it was for a new request, service it, otherwise exit the thread.

Note on Weave root thread

In all the tests, Nim root thread and task was the same as the main function. We need to make sure that it can be on started from a dedicated thread as well, to allow architecture such as the game engine mentioned before.

Additionally, the init should be configurable with the number of threads (overriding the WEAVE_NUM_THREADS environment variable.

Lastly, Weave prevents creating more threads than the number of cores, we might want to relax that to create up to N+1threads. The rationale is that the root thread might be just an event loop that initialize an program, and then while true: poll(); loadBalance() but never calls sync() or syncRoot() and so we would lose 1 core for CPU-bound tasks.

Note on async poll()

Since Weave would compose seamlessly with async() runtimes which can be on dedicated event loop threads via submit, there is no need to add a per-thread hook in Weave to call poll() regularly, or inversely add a hook in async framework to call Weave's loadBalance()`

@mratsim mratsim added enhancement :shipit: New feature or request help wanted 👥 Extra attention is needed question ❓ Further information is requested labels May 10, 2020
@olliNiinivaara
Copy link

I'm not qualified to comment on technical implementation, but here are some general thoughts.

Nim stdlib's threadpool, spawn and parallel are still unstable/broken. We need a replacement for these basic primitives. With ability to set up dedicated threads and submit jobs to them Weave seems to deliver this.

Often submitted jobs are not only isolated, but they do not even return anything to the caller, because their output goes to devices or other processes. This situation should allow some nice optimizations like bypassing the memory pool.

I understand that optimizing for high throughput (LIFO) is ok internally when tasks are cooperative. But in server context incoming requests from clients (the jobs) are more like competitors and therefore optimizing for low latency (FIFO) is there the better choice. No need to go for the last mile because server load is random anyway, the proposed randomized fairness definitely prevents pathological latencies.

Support for existing async solutions is a nice addition but other qualities should not be traded for it. Weave is more fundamental - existing libraries can be adapted afterwards or new ones written that play well with Weave. (Async/await is concurrency concept for single-threaded programming languages anyway, we may not need one.)

For example it seems not a good idea to run Nim's asyncdispatch on top of Weave. It basically calls selector.Selectinto with timeout of 500ms (hard coded) in an endless loop. Any thread where you start this will not be able to do any work with Weave tasks...

Well, my Guildenstern does basically the same, but as soon as there are tasks in flight, the timeout will be dropped to zero and it will call weave synchronization operations in the loop to play nice with it.

I'm speculating that following operation could make my main event loop to play even nicer:
syncRoot(m, n)
where m is minimum number of tasks that must be in-flight before and n is maximum number of tasks in-flight after. For example, executing syncRoot(3,2) would mean that run syncRoot only if there are at least 3 tasks in-flight, and stop syncing as soon as there are no more than 2 tasks in-flight.

Another idea:
loadBalance(t)
where t is (approximate) time in milli(or even nano)seconds that other tasks may run. This would be like calling OS sleep(t), but instead of letting OS scheduler select next process, Weave would select other tasks that may be run even in the same thread. One use case would be calling this in loop while waiting for I/O.

Ok, my mental model of how Weave works might not completely align with reality, but I hope these comments may give you some inspiration anyway.

mratsim added a commit that referenced this issue May 16, 2020
* Create base types for composability with other execution engines #132

* Renamings

* Prepare for job support

* Weave as an executor service - high-level API

* Update the state machine to support executor mode

* Wait for the runtime to be ready (next step, wakeup of worker tree on submission)

* Have a manager thread handle job submissions to avoid race in termination detection

* Fix parallel jobs example to properly wait

* add job spawning tasks test

* Fix awaiting delayed computations

* Implement jobs with multiple dependencies

* add runInBackground to start Weave as a service

* Update tests and documentation

* Workaround upstream regression on GC-safe createThread nim-lang/Nim#14370

* cleanup can have side effect on Windows. Also bmp nimble version

* cleanup for LazyFlowvar can also have side-effects

* Threads cannot be copied ¯\\\_(ツ)\_/¯
@olliNiinivaara
Copy link

File upload/download between hard drive and network socket should be a fruitful practical application area to experiment with parallel device I/0: file device <-> SPSC channel <-> network device

In async/await -terms the SPSC channel seems to be called FutureStream: https://github.com/nim-lang/Nim/blob/version-1-2/lib/pure/asyncstreams.nim#L17

Setting:

  • The file to be transferred can be larger than available main memory
  • Producer and consumer performance varies randomly (due to external load) so that buffer will be sometimes empty, sometimes full
  • When buffer is empty or full, task should be suspended and thread should do work on other tasks (polling instead of busy waiting)
  • Any of the devices might timeout or fail at any time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement :shipit: New feature or request help wanted 👥 Extra attention is needed question ❓ Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants