Skip to content

Commit

Permalink
Added functionality to allow (external) tasks to be run exclusively.
Browse files Browse the repository at this point in the history
When running exclusively, the thread will sleep until all remaining tasks are done - meanwhile, no new tasks can run. Then the exclusive task is run, and then other tasks can be started again.
  • Loading branch information
trolando committed May 4, 2023
1 parent 3577d98 commit 6bbf560
Show file tree
Hide file tree
Showing 5 changed files with 704 additions and 0 deletions.
67 changes: 67 additions & 0 deletions src/lace.c
Expand Up @@ -547,6 +547,12 @@ typedef struct _ExtTask {

static _Atomic(ExtTask*) external_task = NULL;

static pthread_mutex_t external_task_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t external_task_cond = PTHREAD_COND_INITIALIZER;

static int external_task_counter = 0;
static int external_task_exclusive = 0;

void
lace_run_task(Task *task)
{
Expand All @@ -563,12 +569,73 @@ lace_run_task(Task *task)
atomic_store_explicit(&et.task->thief, 0, memory_order_relaxed);
sem_init(&et.sem, 0, 0);

pthread_mutex_lock(&external_task_lock);
while (external_task_exclusive) {
// if "exclusive" is set, then we wait until we can continue
pthread_cond_wait(&external_task_cond, &external_task_lock);
}
external_task_counter++;
pthread_mutex_unlock(&external_task_lock);

ExtTask *exp = 0;
while (atomic_compare_exchange_weak(&external_task, &exp, &et) != 1) {}

sem_wait(&et.sem);
sem_destroy(&et.sem);

pthread_mutex_lock(&external_task_lock);
external_task_counter--;
if (external_task_exclusive && external_task_counter == 0) {
// if counter is 0 and exclusive is set, then we should wake up the threads
pthread_cond_broadcast(&external_task_cond);
}
pthread_mutex_unlock(&external_task_lock);

// allow Lace workers to sleep again
lace_suspend();
}
}

void
lace_run_task_exclusive(Task *task)
{
// check if we are really not in a Lace thread
WorkerP* self = lace_get_worker();
if (self != 0) {
task->f(self, lace_get_head(self), task);
} else {
// if needed, wake up the workers
lace_resume();

ExtTask et;
et.task = task;
atomic_store_explicit(&et.task->thief, 0, memory_order_relaxed);
sem_init(&et.sem, 0, 0);

pthread_mutex_lock(&external_task_lock);
while (external_task_exclusive) {
// if "exclusive" is set, then we wait until we can continue
pthread_cond_wait(&external_task_cond, &external_task_lock);
}
external_task_exclusive = 1;
while (external_task_counter > 0) {
// wait until all other tasks are done
pthread_cond_wait(&external_task_cond, &external_task_lock);
}
pthread_mutex_unlock(&external_task_lock);

ExtTask *exp = 0;
while (atomic_compare_exchange_weak(&external_task, &exp, &et) != 1) {}

sem_wait(&et.sem);
sem_destroy(&et.sem);

pthread_mutex_lock(&external_task_lock);
external_task_exclusive = 0;
// wake up any waiters
pthread_cond_broadcast(&external_task_cond);
pthread_mutex_unlock(&external_task_lock);

// allow Lace workers to sleep again
lace_suspend();
}
Expand Down

0 comments on commit 6bbf560

Please sign in to comment.