Skip to content

Commit

Permalink
fixed bugs in emscripten threading
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanlf committed Apr 17, 2024
1 parent 87efbf2 commit 8a5b4d2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 45 deletions.
91 changes: 51 additions & 40 deletions src/filter_core/filter_session.c
Expand Up @@ -1586,8 +1586,24 @@ void gf_fs_print_debug_info(GF_FilterSession *fsess, GF_SessionDebugFlag dbg_fla
#if defined(GPAC_CONFIG_EMSCRIPTEN) && !defined(GPAC_DISABLE_THREADS)
#include <emscripten/threading.h>
GF_Err gf_th_async_call(GF_Thread *t, u32 (*Run)(void *param), void *param);
static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread);

Bool em_check_yield_th_proc(u32 thid, GF_SessionThread *sess_thread, u64 enter_time)
{
if (thid
&& (sess_thread->fsess->run_status == GF_OK)
) {
sess_thread->run_time += gf_sys_clock_high_res() - enter_time;
safe_int_dec(&sess_thread->fsess->active_threads);
gf_th_async_call(sess_thread->th, (gf_thread_run) gf_fs_thread_proc, sess_thread);
return GF_TRUE;
}
return GF_FALSE;
}
#endif

const char *gf_th_log_name(GF_Thread *t);

static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
{
GF_FilterSession *fsess = sess_thread->fsess;
Expand All @@ -1602,7 +1618,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
u64 enter_time = gf_sys_clock_high_res();
Bool use_main_sema = thid ? GF_FALSE : GF_TRUE;
#ifndef GPAC_DISABLE_LOG
u32 sys_thid = gf_th_id();
const char *sys_thid = gf_th_log_name(sess_thread->th);
#endif
u64 next_task_schedule_time = 0;
Bool do_regulate = (fsess->flags & GF_FS_FLAG_NO_REGULATION) ? GF_FALSE : GF_TRUE;
Expand All @@ -1624,7 +1640,12 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
if (fsess->non_blocking) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Main thread proc enter\n"));
}
#ifdef GPAC_CONFIG_EMSCRIPTEN
} else {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s proc enter\n", sys_thid));
#endif
}

//first time we enter the thread proc
if (!sess_thread->th_id) {
sess_thread->th_id = gf_th_id();
Expand Down Expand Up @@ -1680,20 +1701,11 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)

if (!skip_next_sema_wait && (current_filter==NULL)) {
gf_rmt_begin(sema_wait, GF_RMT_AGGREGATE);
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u Waiting scheduler %s semaphore\n", sys_thid, use_main_sema ? "main" : "secondary"));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s Waiting scheduler %s semaphore\n", sys_thid, use_main_sema ? "main" : "secondary"));
//wait for something to be done
gf_fs_sema_io(fsess, GF_FALSE, use_main_sema);
consecutive_filter_tasks = 0;
gf_rmt_end();
#ifdef GPAC_CONFIG_EMSCRIPTEN
//no tasks on main thread, exit setting in_main_sem_wait to trigger a NOT_READY on fs_run
//this will give control back to JS
if (use_main_sema && fsess->non_blocking && !gf_fq_count(fsess->main_thread_tasks)) {
safe_int_inc(&fsess->active_threads);
fsess->in_main_sem_wait=1;
break;
}
#endif
}
safe_int_inc(&fsess->active_threads);
skip_next_sema_wait = GF_FALSE;
Expand All @@ -1716,6 +1728,14 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
}
}
force_secondary_tasks = GF_FALSE;
#ifdef GPAC_CONFIG_EMSCRIPTEN
//no tasks on main thread, exit setting in_main_sem_wait to trigger a NOT_READY on fs_run
//this will give control back to JS
if (fsess->non_blocking && !task) {
fsess->in_main_sem_wait = GF_TRUE;
break;
}
#endif
} else {
task = gf_fq_pop(fsess->tasks);
if (task && (task->force_main || (task->filter && task->filter->nb_main_thread_forced) ) ) {
Expand Down Expand Up @@ -1821,17 +1841,14 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
return 0;
}

GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: no task available\n", sys_thid));

#if defined(GPAC_CONFIG_EMSCRIPTEN) && !defined(GPAC_DISABLE_THREADS)
if (thid && (fsess->run_status == GF_OK)) {
sess_thread->run_time += gf_sys_clock_high_res() - enter_time;
safe_int_dec(&fsess->active_threads);
gf_th_async_call(sess_thread->th, (gf_thread_run) gf_fs_thread_proc, sess_thread);
if (em_check_yield_th_proc(thid, sess_thread, enter_time)) {
return 0;
}
#endif

GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: no task available\n", sys_thid));

if (do_regulate) {
gf_sleep(0);
}
Expand Down Expand Up @@ -1901,7 +1918,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
gf_fq_add(fsess->tasks, task);
if (next) {
if (next->schedule_next_time <= (u64) now) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, next task time ready for execution\n", sys_thid, task_log_name));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s reposted, next task time ready for execution\n", sys_thid, task_log_name));

skip_next_sema_wait = GF_TRUE;
continue;
Expand All @@ -1910,7 +1927,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
ndiff -= now;
ndiff /= 1000;
if (ndiff<diff) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s scheduled after next task %s:%s (in %d ms vs %d ms)\n", sys_thid, task_log_name, next->log_name, next->filter ? next->filter->name : "", (s32) diff, (s32) ndiff));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s scheduled after next task %s:%s (in %d ms vs %d ms)\n", sys_thid, task_log_name, next->log_name, next->filter ? next->filter->name : "", (s32) diff, (s32) ndiff));
diff = ndiff;
}
}
Expand All @@ -1926,10 +1943,10 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
if ( gf_fq_count(fsess->tasks) > MONOTH_MIN_TASKS)
diff = MONOTH_MIN_SLEEP;
}
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, %s task scheduled after this task, sleeping for %d ms (task diff %d - next diff %d)\n", sys_thid, task_log_name, next ? "next" : "no", diff, tdiff, ndiff));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s reposted, %s task scheduled after this task, sleeping for %d ms (task diff %d - next diff %d)\n", sys_thid, task_log_name, next ? "next" : "no", diff, tdiff, ndiff));
gf_sleep((u32) diff);
} else {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s reposted, next task scheduled after this task, rerun\n", sys_thid, task_log_name));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s reposted, next task scheduled after this task, rerun\n", sys_thid, task_log_name));
}
skip_next_sema_wait = GF_TRUE;
continue;
Expand All @@ -1951,17 +1968,14 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
safe_int_dec(&fsess->tasks_pending);
task->notified = GF_FALSE;
}
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s reposted to filter task until task exec time is reached (%d us)\n", sys_thid, current_filter->name, task->log_name, (s32) (task->schedule_next_time - next->schedule_next_time) ));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s:%s reposted to filter task until task exec time is reached (%d us)\n", sys_thid, current_filter->name, task->log_name, (s32) (task->schedule_next_time - next->schedule_next_time) ));
//remove task
gf_fq_pop(current_filter->tasks);
//and queue it after the next one
gf_fq_add(current_filter->tasks, task);
//and continue with the same filter
#if defined(GPAC_CONFIG_EMSCRIPTEN) && !defined(GPAC_DISABLE_THREADS)
if (thid && (fsess->run_status == GF_OK)) {
sess_thread->run_time += gf_sys_clock_high_res() - enter_time;
safe_int_dec(&fsess->active_threads);
gf_th_async_call(sess_thread->th, (gf_thread_run) gf_fs_thread_proc, sess_thread);
if (em_check_yield_th_proc(thid, sess_thread, enter_time)) {
return 0;
}
#endif
Expand All @@ -1988,7 +2002,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
}

if (next_time_secondary<next_time_main) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: forcing secondary task list on main - current task schedule time "LLU" (diff to now %d) vs next time secondary "LLU" (%s::%s)\n", sys_thid, task->schedule_next_time, (s32) diff, next_time_secondary, next->filter ? next->filter->freg->name : "", next->log_name));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: forcing secondary task list on main - current task schedule time "LLU" (diff to now %d) vs next time secondary "LLU" (%s::%s)\n", sys_thid, task->schedule_next_time, (s32) diff, next_time_secondary, next->filter ? next->filter->freg->name : "", next->log_name));
diff = 0;
force_secondary_tasks = GF_TRUE;
break;
Expand All @@ -2009,10 +2023,10 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
tdiff -= now;
if (tdiff < 0) tdiff=0;
if (tdiff<diff) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: next task has earlier exec time than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: next task has earlier exec time than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
diff = tdiff;
} else {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: next task has earlier exec time#2 than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: next task has earlier exec time#2 than current task %s:%s, adjusting sleep (old %d - new %d)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, (s32) tdiff));

}
}
Expand All @@ -2024,7 +2038,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
if ( gf_fq_count(fsess->tasks) > MONOTH_MIN_TASKS)
diff = MONOTH_MIN_SLEEP;
}
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s postponed for %d ms (scheduled time "LLU" us, next task schedule "LLU" us)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, task->schedule_next_time, next_task_schedule_time));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s:%s postponed for %d ms (scheduled time "LLU" us, next task schedule "LLU" us)\n", sys_thid, current_filter->name, task->log_name, (s32) diff, task->schedule_next_time, next_task_schedule_time));

gf_sleep((u32) diff);
active_start = gf_sys_clock_high_res();
Expand All @@ -2034,7 +2048,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
if (diff > 100 ) {
u32 pending_tasks;
Bool use_main = (current_filter->freg->flags & GF_FS_REG_MAIN_THREAD) ? GF_TRUE : GF_FALSE;
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: releasing current filter %s, exec time for %s due in "LLD" us\n", sys_thid, current_filter->name, task->log_name, diff));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: releasing current filter %s, exec time for %s due in "LLD" us\n", sys_thid, current_filter->name, task->log_name, diff));
current_filter->process_th_id = 0;
current_filter->in_process = GF_FALSE;
//don't touch the current filter tasks, just repost the task to the main/secondary list
Expand Down Expand Up @@ -2086,10 +2100,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
}

#if defined(GPAC_CONFIG_EMSCRIPTEN) && !defined(GPAC_DISABLE_THREADS)
if (thid && (fsess->run_status == GF_OK)) {
sess_thread->run_time += gf_sys_clock_high_res() - enter_time;
safe_int_dec(&fsess->active_threads);
gf_th_async_call(sess_thread->th, (gf_thread_run) gf_fs_thread_proc, sess_thread);
if (em_check_yield_th_proc(thid, sess_thread, enter_time)) {
return 0;
}
#endif
Expand All @@ -2098,7 +2109,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
force_secondary_tasks=GF_FALSE;
}
}
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u: task %s:%s schedule time "LLU" us reached (diff %d ms)\n", sys_thid, current_filter ? current_filter->name : "", task->log_name, task->schedule_next_time, (s32) diff));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s: task %s:%s schedule time "LLU" us reached (diff %d ms)\n", sys_thid, current_filter ? current_filter->name : "", task->log_name, task->schedule_next_time, (s32) diff));

}
next_task_schedule_time = 0;
Expand All @@ -2114,7 +2125,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
sess_thread->has_seen_eot = GF_FALSE;
gf_assert(!task->filter || gf_fq_count(task->filter->tasks));

GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u task#%d %p executing Filter %s::%s (%d tasks pending, %d(%d) process task queued)\n", sys_thid, sess_thread->nb_tasks, task, task->filter ? task->filter->name : "none", task->log_name, fsess->tasks_pending, task->filter ? task->filter->process_task_queued : 0, task->filter ? gf_fq_count(task->filter->tasks) : 0));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s task#%d %p executing Filter %s::%s (%d tasks pending, %d(%d) process task queued)\n", sys_thid, sess_thread->nb_tasks, task, task->filter ? task->filter->name : "none", task->log_name, fsess->tasks_pending, task->filter ? task->filter->process_task_queued : 0, task->filter ? gf_fq_count(task->filter->tasks) : 0));

safe_int_inc(& fsess->tasks_in_process );
gf_assert( task->run_task );
Expand Down Expand Up @@ -2265,11 +2276,11 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
if (requeue) {
//if requeue on a filter active, use filter queue to avoid another thread grabing the task (we would have concurrent access to the filter)
if (current_filter) {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u re-posted task Filter %s::%s in filter tasks (%d pending)\n", sys_thid, task->filter->name, task->log_name, fsess->tasks_pending));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s re-posted task Filter %s::%s in filter tasks (%d pending)\n", sys_thid, task->filter->name, task->log_name, fsess->tasks_pending));
task->notified = GF_FALSE;
//keep this thread running on the current filter no signaling of semaphore
} else {
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u re-posted task Filter %s::%s in %s tasks (%d pending)\n", sys_thid, task->filter ? task->filter->name : "none", task->log_name, (task->filter && (task->filter->freg->flags & GF_FS_REG_MAIN_THREAD)) ? "main" : "secondary", fsess->tasks_pending));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s re-posted task Filter %s::%s in %s tasks (%d pending)\n", sys_thid, task->filter ? task->filter->name : "none", task->log_name, (task->filter && (task->filter->freg->flags & GF_FS_REG_MAIN_THREAD)) ? "main" : "secondary", fsess->tasks_pending));

task->notified = GF_TRUE;
safe_int_inc(&fsess->tasks_pending);
Expand Down Expand Up @@ -2315,7 +2326,7 @@ static u32 gf_fs_thread_proc(GF_SessionThread *sess_thread)
gf_mx_v(fsess->filters_mx);
}
#endif
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %u task#%d %p pushed to reservoir\n", sys_thid, sess_thread->nb_tasks, task));
GF_LOG(GF_LOG_DEBUG, GF_LOG_SCHEDULER, ("Thread %s task#%d %p pushed to reservoir\n", sys_thid, sess_thread->nb_tasks, task));

if (gf_fq_res_add(fsess->tasks_reservoir, task)) {
gf_free(task);
Expand Down
21 changes: 16 additions & 5 deletions src/utils/os_thread.c
Expand Up @@ -107,6 +107,10 @@ static const char *log_th_name(u32 id)
}
return "Main Process";
}
const char *gf_th_log_name(GF_Thread *t)
{
return t ? t->log_name : "Main Process";
}

#endif

Expand Down Expand Up @@ -172,6 +176,9 @@ GF_Thread * gf_th_current() {

#endif /* GPAC_CONFIG_ANDROID */

#if defined(GPAC_CONFIG_EMSCRIPTEN) && !defined(GPAC_DISABLE_THREADS)
#include <emscripten/eventloop.h>
#endif

#ifdef WIN32
DWORD WINAPI RunThread(void *ptr)
Expand Down Expand Up @@ -202,7 +209,7 @@ void * RunThread(void *ptr)

#ifdef GPAC_CONFIG_ANDROID
if (pthread_once(&currentThreadInfoKey_once, &currentThreadInfoKey_alloc) || pthread_setspecific(currentThreadInfoKey, t))
GF_LOG(GF_LOG_ERROR, GF_LOG_MUTEX, ("[Mutex] Couldn't run thread %s, ID 0x%08x\n", t->log_name, t->id));
GF_LOG(GF_LOG_ERROR, GF_LOG_MUTEX, ("[Mutex] Couldn't run thread %s, ID %u\n", t->log_name, t->id));
#endif /* GPAC_CONFIG_ANDROID */
t->status = GF_THREAD_STATUS_RUN;

Expand All @@ -211,7 +218,7 @@ void * RunThread(void *ptr)

#ifndef GPAC_DISABLE_LOG
t->id = gf_th_id();
GF_LOG(GF_LOG_INFO, GF_LOG_MUTEX, ("[Thread %s] At %d Entering thread proc - thread ID 0x%08x\n", t->log_name, gf_sys_clock(), t->id));
GF_LOG(GF_LOG_INFO, GF_LOG_MUTEX, ("[Thread %s] At %d Entering thread proc - thread ID %u\n", t->log_name, gf_sys_clock(), t->id));
#endif

/* Each thread has its own seed */
Expand All @@ -223,8 +230,11 @@ void * RunThread(void *ptr)
exit:

if (t->no_kill) {
t->no_kill = 0;
#ifdef WIN32
#if defined(GPAC_CONFIG_EMSCRIPTEN) && !defined(GPAC_DISABLE_THREADS)
GF_LOG(GF_LOG_DEBUG, GF_LOG_MUTEX, ("[Thread %s] exit with no kill - unwinding to JS event loop\n", t->log_name));
emscripten_unwind_to_js_event_loop();
return (void *)ret;
#elif defined(WIN32)
return ret;
#else
return (void *)ret;
Expand Down Expand Up @@ -326,6 +336,7 @@ GF_Err gf_th_run(GF_Thread *t, u32 (*Run)(void *param), void *param)
GF_Err gf_th_async_call(GF_Thread *t, u32 (*Run)(void *param), void *param)
{
t->no_kill = 1;
GF_LOG(GF_LOG_DEBUG, GF_LOG_MUTEX, ("[Thread %s] async EM call\n", t->log_name));
emscripten_dispatch_to_thread_async_(t->threadH, EM_FUNC_SIG_II, Run, NULL, param);
return GF_OK;
}
Expand Down Expand Up @@ -364,7 +375,7 @@ void Thread_Stop(GF_Thread *t, Bool Destroy)
if (emscripten_is_main_runtime_thread()) {
int pthread_tryjoin_np(pthread_t, void **);
void *rval=NULL;
if (pthread_tryjoin_np(t->threadH, &rval))
if (!t->no_kill && pthread_tryjoin_np(t->threadH, &rval))
GF_LOG(GF_LOG_ERROR, GF_LOG_MUTEX, ("[Thread %s] pthread_join() returned an error with thread ID 0x%08x\n", t->log_name, t->id));
}
#else
Expand Down

0 comments on commit 8a5b4d2

Please sign in to comment.