Skip to content

Commit

Permalink
🎨 threadpools
Browse files Browse the repository at this point in the history
  • Loading branch information
fennecdjay committed Sep 25, 2023
1 parent c838675 commit c7d5ad8
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 21 deletions.
14 changes: 9 additions & 5 deletions include/gwion_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
#define THREAD_FUNC(a) THREAD_RETTYPE(a)(void *data)
#define THREAD_TYPE HANDLE
#define THREAD_CREATE(thread, func, arg) \
thread = CreateThread(NULL, 0, func, arg, 0, NULL);
#define THREAD_JOIN(thread) WaitForSingleObject(thread, 0);
thread = CreateThread(NULL, 0, func, arg, 0, NULL)
#define THREAD_JOIN(thread) WaitForSingleObject(thread, 0)
#define THREAD_RETURN(arg) return 0;
#define THREAD_DETACH(arg)

#define MUTEX_TYPE HANDLE
#define MUTEX_INITIALIZER NULL
Expand All @@ -28,6 +29,7 @@ int emulate_pthread_mutex_lock(volatile MUTEX_TYPE *mx);
#define THREAD_COND_SETUP(x) x = CreateEvent(NULL, FALSE, FALSE, NULL);
#define THREAD_COND_WAIT(x, mutex) WaitForSingleObject(x, INFINITE)
#define THREAD_COND_SIGNAL(x) SetEvent(x);
#define THREAD_COND_BROADCAST(x) SetEvent(x);
#define THREAD_COND_CLEANUP(x) CloseHandle(x)
#else
#include <pthread.h>
Expand All @@ -36,13 +38,14 @@ int emulate_pthread_mutex_lock(volatile MUTEX_TYPE *mx);
#define THREAD_FUNC(a) THREAD_RETTYPE(a)(void *data)
#define THREAD_TYPE pthread_t
#define THREAD_CREATE(thread, func, arg) \
pthread_create(&thread, NULL, func, arg);
#define THREAD_JOIN(thread) pthread_join(thread, NULL);
pthread_create(&thread, NULL, func, arg)
#define THREAD_JOIN(thread) pthread_join(thread, NULL)
#define THREAD_RETURN(arg) \
{ \
pthread_exit(NULL); \
return NULL; \
}
#define THREAD_DETACH(arg) pthread_detach(arg)

#define MUTEX_TYPE pthread_mutex_t *
#define MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
Expand All @@ -64,14 +67,15 @@ int emulate_pthread_mutex_lock(volatile MUTEX_TYPE *mx);
#define MUTEX_UNLOCK(x) pthread_mutex_unlock((x))
#define MUTEX_COND_UNLOCK(x) pthread_mutex_unlock((x))

#define THREAD_COND_TYPE pthread_cond_t *
#define THREAD_COND_TYPE pthread_cond_t*
#define THREAD_COND_SETUP(x) \
{ \
x = (pthread_cond_t *)xcalloc(1, sizeof(pthread_cond_t)); \
pthread_cond_init(x, NULL); \
}
#define THREAD_COND_WAIT(x, mutex) pthread_cond_wait(x, mutex)
#define THREAD_COND_SIGNAL(x) pthread_cond_signal(x)
#define THREAD_COND_BROADCAST(x) pthread_cond_broadcast(x)
#define THREAD_COND_CLEANUP(x) \
{ \
pthread_cond_destroy(x); \
Expand Down
3 changes: 2 additions & 1 deletion include/gwion_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
#include "mpool.h"
#include "gwion_print.h"
#include "container.h"
#include "mp_vector.h"
#include "threadpool.h"
#include "hash.h"
#include "symbol.h"
#include "gwion_text.h"
#include "mp_string.h"
#include "carg.h"
#include "mp_vector.h"

#if (defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || defined(__WIN32))
#include "windows_missing.h"
Expand Down
8 changes: 4 additions & 4 deletions include/symbol.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ struct Symbol_ {
char name[];
};
typedef struct SymTable_ {
MUTEX_TYPE mutex;
MemPool p;
size_t sz;
Symbol sym[];
gwtlock_t mutex;
MemPool p;
size_t sz;
Symbol sym[];
} SymTable;

ANN SymTable *new_symbol_table(MemPool p, const size_t sz);
Expand Down
97 changes: 97 additions & 0 deletions include/threadpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#ifdef BUILD_ON_WINDOWS
typedef HANDLE gwtthread_t;
typedef PCRITICAL_SECTION gwtmutex_t;
typedef PCONDITION_VARIABLE gwtcond_t;
typedef long unsigned gwtreturn_t;
#else
#include <pthread.h>
typedef pthread_t gwtthread_t;
typedef pthread_mutex_t gwtlock_t;
typedef pthread_cond_t gwtcond_t;
typedef void* gwtreturn_t;
#endif

typedef struct threadpool_t threadpool_t;

typedef enum {
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5
} threadpool_error_t;

threadpool_t *new_threadpool(const uint32_t thread_count, const uint32_t queue_size);
ANN2(1, 2) bool threadpool_add(threadpool_t *pool, void (*routine)(void *), void *arg);
ANN void free_threadpool(threadpool_t *pool);


#ifdef BUILD_ON_WINDOWS
ANN static inline void gwt_lock(threadpool_t *pool) {
EnterCriticalSection(&pool->lock); // void
}
ANN static inline void gwt_unlock(threadpool_t *pool) {
LeaveCriticalSection(&pool->lock); // void
}
ANN static inline void gwt_wait(gwtcond_t *cond, gwtlock_t *lock) {
SleepConditionVariableCS(cond, lock, INFINITE); // bool
}
ANN static inline void gwt_broadcast(threadpool_t *pool) {
WakeAllConditionVariable(&pool->cond); // void
}
ANN static inline void gwt_signal(threadpool_t *pool) {
return WakeConditionVariable(&pool->cond); // void
}
ANN static inline void gwt_create(gwtthread_t *thread, void* (*fun)(void*), void *arg) {
*thread = CreateThread(NULL, 0, func, arg, 0, NULL);
}
ANN static inline void gwt_join(gwtthread_t thread) {
WaitForSingleObject(thread, INFINITE); // dword // (DWORD)0xFFFFFFFF on error
}
ANN static inline void gwt_lock_end(gwtlock_t *lock) {
return DeleteCriticalSection(*lock);
}
ANN static inline void gwt_cond_end(gwtcond_t *cond) {}
#else
ANN static inline int gwt_lock(gwtlock_t *lock) {
return pthread_mutex_lock(lock);
}
ANN static inline int gwt_unlock(gwtlock_t *lock) {
return pthread_mutex_unlock(lock);
}
ANN static inline void gwt_join(gwtthread_t thread) {
pthread_join(thread, NULL);
}
ANN static inline void gwt_wait(gwtcond_t *cond, gwtlock_t *lock) {
pthread_cond_wait(cond, lock);
}
ANN static inline void gwt_broadcast(gwtcond_t *cond) {
pthread_cond_broadcast(cond);
}
ANN static inline int gwt_signal(gwtcond_t *cond) {
return pthread_cond_signal(cond);
}
ANN static inline int gwt_lock_ini(gwtlock_t *lock) {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
return pthread_mutex_init(lock, &attr);
}
ANN static inline void gwt_lock_end(gwtlock_t *lock) {
pthread_mutex_destroy(lock); // int
}
ANN static inline int gwt_cond_ini(gwtcond_t *cond) {
return pthread_cond_init(cond, NULL);
}
ANN static inline void gwt_cond_end(gwtcond_t *cond) {
pthread_cond_destroy(cond); // int
}
ANN static inline bool gwt_create(gwtthread_t *thread, gwtreturn_t (*fun)(void*), void *arg) {
return !pthread_create(thread, NULL, fun, arg); // int
}
#endif


#endif
14 changes: 7 additions & 7 deletions src/mpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct Recycle {

struct pool {
uint8_t ** data;
MUTEX_TYPE mutex;
gwtlock_t mutex;
volatile struct Recycle *next;
uint32_t obj_sz;
uint32_t obj_id;
Expand All @@ -29,7 +29,7 @@ ANN static void mp_set(struct pool *p, const uint32_t obj_sz) {
p->nblk = 1;
p->next = NULL;
p->data = (uint8_t **)xcalloc(1, sizeof(uint8_t *));
MUTEX_SETUP(p->mutex);
gwt_lock_ini(&p->mutex);
}

MemPool mempool_ini(const size_t sz) {
Expand Down Expand Up @@ -72,7 +72,7 @@ ANN struct pool *mp_ini(MemPool mp, const uint32_t obj_sz) {
}

void mp_end(struct pool *p) {
MUTEX_CLEANUP(p->mutex);
gwt_lock_end(&p->mutex);
for (uint32_t i = 0; i < p->nblk && p->data[i]; ++i) xfree(p->data[i]);
xfree(p->data);
}
Expand Down Expand Up @@ -104,21 +104,21 @@ static void *__mp_calloc2(struct pool *p, const m_bool zero) {
}

void *_mp_calloc2(struct pool *p, const m_bool zero) {
MUTEX_LOCK(p->mutex);
gwt_lock(&p->mutex);
void *ret = __mp_calloc2(p, zero);
MUTEX_UNLOCK(p->mutex);
gwt_unlock(&p->mutex);
return ret;
}

void _mp_free2(struct pool *p, void *ptr) {
MUTEX_LOCK(p->mutex);
gwt_lock(&p->mutex);
volatile struct Recycle *next = p->next;
#ifdef POOL_CHECK
memset(ptr, 0, p->obj_sz);
#endif
p->next = ptr;
p->next->next = next;
MUTEX_UNLOCK(p->mutex);
gwt_unlock(&p->mutex);
}

void _mp_free(MemPool mp, const m_uint size, void *ptr) {
Expand Down
8 changes: 4 additions & 4 deletions src/symbol.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
ANN SymTable *new_symbol_table(MemPool p, const size_t sz) {
SymTable *const st = mp_malloc2(p, TABLE_SZ(sz));
st->sz = sz;
MUTEX_SETUP(st->mutex);
gwt_lock_ini(&st->mutex);
st->p = p;
return st;
}
Expand All @@ -22,7 +22,7 @@ ANN void free_symbols(SymTable *const ht) {
const Symbol s = ht->sym[i - 1];
if (s) free_symbol(ht->p, s);
}
MUTEX_CLEANUP(ht->mutex);
gwt_lock_end(&ht->mutex);
mp_free2(ht->p, TABLE_SZ(ht->sz), ht);
}

Expand All @@ -43,8 +43,8 @@ ANN Symbol insert_symbol(SymTable *const ht, const m_str name) {
LOOP_OPTIM
for (Symbol sym = syms; sym; sym = sym->next)
if (!strcmp(sym->name, name)) return sym;
MUTEX_LOCK(ht->mutex);
gwt_lock(&ht->mutex);
*addr = mksymbol(ht->p, name, syms);
MUTEX_UNLOCK(ht->mutex);
gwt_unlock(&ht->mutex);
return ht->sym[index];
}
109 changes: 109 additions & 0 deletions src/threadpool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include "gwion_util.h"

typedef struct {
void (*fun)(void *);
void *arg;
} task_t;

struct threadpool_t {
gwtlock_t lock;
gwtcond_t cond;
gwtthread_t *threads;
task_t *queue;
uint32_t head;
uint32_t tail;
uint32_t active;
uint32_t queue_size;
uint32_t started;
bool shutdown;
bool has_lock;
bool has_cond;
};

ANN static gwtreturn_t threadpool_thread(void *data) {
threadpool_t *p = (threadpool_t *)data;
while(true) {
(void)gwt_lock(&p->lock);
while(!p->active && !p->shutdown)
(void)gwt_wait(&p->cond, &p->lock);
if(p->shutdown) break;
task_t task = p->queue[p->head];
p->head = (p->head + 1) % p->queue_size;
p->active--;
(void)gwt_unlock(&p->lock);
task.fun(task.arg);
}
(void)gwt_unlock(&p->lock);
THREAD_RETURN(NULL);
}

ANN static bool alloc(threadpool_t *p, const uint32_t thread_count,
const uint32_t queue_size) {
return !(p->threads = malloc(thread_count * sizeof(gwtthread_t))) ||
!(p->queue = malloc(queue_size * sizeof(task_t)));
}

ANN static bool utils(threadpool_t *p) {
if(gwt_lock_ini(&p->lock)) return false;
p->has_lock = true;
if(gwt_cond_ini(&p->cond)) return false;
p->has_cond = true;
return true;
}

ANN static bool start(threadpool_t *p, const uint32_t thread_count) {
for(uint32_t i = 0; i < thread_count; i++) {
if(gwt_create(&p->threads[i], threadpool_thread, p))
return false;
p->started++;
}
return true;
}

threadpool_t *new_threadpool(const uint32_t thread_count, const uint32_t queue_size) {
threadpool_t *p = malloc(sizeof(threadpool_t));
if(!p) return NULL;
p->head = p->tail = p->active = 0;
p->shutdown = p->has_lock = p->has_cond = false;
p->started = 0;
if(alloc(p, thread_count, queue_size) || !utils(p) ||
start(p, thread_count)) {
free_threadpool(p);
return NULL;
}
return p;
}

ANN2(1, 2) static bool add(threadpool_t *p, void (*fun)(void *), void *arg) {
if(unlikely(p->shutdown || p->active == p->queue_size))
return false;
const uint32_t next = (p->tail + 1) % p->queue_size;
task_t t = { .fun = fun, .arg = arg };
p->queue[p->tail] = t;
p->tail = next;
p->active++;
(void)gwt_signal(&p->cond);
return true;
}

bool threadpool_add(threadpool_t *p, void (*fun)(void *), void *arg) {
(void)gwt_lock(&p->lock);
const bool ret = add(p, fun, arg);
(void)gwt_unlock(&p->lock);
return ret;
}

ANN void free_threadpool(threadpool_t *p) {
(void)gwt_lock(&p->lock);
p->shutdown = true;
gwt_broadcast(&p->cond);
(void)gwt_unlock(&p->lock);
if(p->threads) {
for(uint32_t i = 0; i < p->started; i++)
gwt_join(p->threads[i]);
free(p->threads);
}
if(p->queue) free(p->queue);
if(p->has_lock) gwt_lock_end(&p->lock);
if(p->has_cond) gwt_cond_end(&p->cond);
}

0 comments on commit c7d5ad8

Please sign in to comment.