Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MOD-6572 replace semaphore with wait in jobq #4446

Open
wants to merge 48 commits into
base: master
Choose a base branch
from

Conversation

meiravgri
Copy link
Collaborator

@meiravgri meiravgri commented Feb 12, 2024

This PR significantly modifies the threadpool's pulling mechanism, setting the stage for more flexible runtime configurations.

Changing pulling synchronization machanism

Previously, the threadpool was synchronized using a semaphore-based waiting mechanism for job retrieval. This PR replaces that mechanism with a condition variable.

The revised job pulling mechanism operates as follows:
2. Threads enter a waiting state on a condition variable when the job queue is both empty and running.
3. Upon adding a new job to the queue:

  1. Single job addition triggers a signal to one thread.
  2. Adding multiple jobs results in a broadcast signal, waking up all threads for processing.

Configure threadpool at runtime

Threadpool state

can be either initialized - has n_threads valid and ready to pull threads,
or uninitialized - some or all of the threads may have exited.
Threadpool state is set to uninitialized when terminate_when_empty or destroy are called.

Removed threads_all_idle condition variable

drain and wait are internally implemented with a busy wait.
calling wait() is equivalent to calling drain(threshold = 0, yieldCB = nullptr)

Thread state

A thread can be in one of three states: running, terminate_when_empty, or dead.
The thread state can be configured by pushing a change_state_job to the admin priority queue.

Pause and resume

When pausing the threadpool, the jobq state is changed to paused. pause function will return when there are no more jobs in progress (i.e num_working_threads == 0)

Intialization

The new design assumes that the threadpool is initialized by the main thread.
All threadpool initialization is lazy and occurs upon the first push to the queue.
Since the GC pushes to the jobq from the bg, it breaks this assumption. Hence, the GC thpool initialized upon module startup.

Auestethic changes

Add name to the threadpool

The threadpool name is used to set the thepool's threads names.
a thread name is <thpool_name>-<rand_id>.
As the maximum number of threads per thpool is 10,000, rand_id is a random number between 0 to 9,999.

Remove thpool->threads array

This array was unnecessary and has been removed.

Copy link

codecov bot commented Feb 12, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 86.25%. Comparing base (e907ece) to head (d147db0).

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4446      +/-   ##
==========================================
- Coverage   86.25%   86.25%   -0.01%     
==========================================
  Files         190      190              
  Lines       34822    34822              
==========================================
- Hits        30036    30035       -1     
- Misses       4786     4787       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@meiravgri meiravgri changed the title replace semaphore with wait in jobq MOD-6572 replace semaphore with wait in jobq Feb 23, 2024
remove_sem_bm.py Outdated Show resolved Hide resolved
deps/thpool/thpool.c Outdated Show resolved Hide resolved
deps/thpool/thpool.c Show resolved Hide resolved
deps/thpool/thpool.c Outdated Show resolved Hide resolved
rename priority_queue->num_threads_working to num_threads_not_idle

uncomment LOG

remove script
GuyAv46
GuyAv46 previously approved these changes Mar 26, 2024
deps/thpool/thpool.c Show resolved Hide resolved
GuyAv46
GuyAv46 previously approved these changes Mar 26, 2024
deps/thpool/thpool.c Outdated Show resolved Hide resolved
GuyAv46
GuyAv46 previously approved these changes Apr 1, 2024
Copy link
Collaborator

@alonre24 alonre24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NICE!
few small comments

deps/thpool/thpool.c Outdated Show resolved Hide resolved
deps/thpool/thpool.c Outdated Show resolved Hide resolved
@@ -86,15 +81,11 @@ typedef enum {
typedef struct redisearch_thpool_t {
thread** threads; /* pointer to threads */
size_t total_threads_count;
volatile size_t num_threads_alive; /* threads currently alive */
volatile size_t num_threads_working; /* threads currently working */
volatile atomic_size_t num_threads_alive; /* threads currently alive */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that atomic type implicitly means volatile as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as far as i know, those are different concepts.
Do you have a reference that claims something else?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are different you are right, but do we need both? Isn't atomic enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this variable is accessed and modified across different threads, yes.
Having said that, should_run should be volatile as well.

LOG_IF_EXISTS("verbose", "Job queue is empty - terminating thread %d", thread_p->id);
thpool_p->state = THPOOL_UNINITIALIZED;
thpool_p->jobqueue.should_run = false;
pthread_cond_broadcast(&thpool_p->jobqueue.has_jobs);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure, in THPOOL_TERMINATE_WHEN_EMPTY mode, all threads will come here eventually, right? (meaning pthread_cond_broadcast is called multiple times)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n_threads in the worst case.
Here we also change the thpool state so if we are lucky some threads will finish their jobs before the jobq is empty, try to continue to the next iteration (skipping the broadcast), and then a different thread will change the state and the first one will not enter the loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Consider elaborating on the documentation in this sensitive place

deps/thpool/thpool.c Outdated Show resolved Hide resolved
tests/cpptests/test_cpp_thpool.cpp Outdated Show resolved Hide resolved
tests/cpptests/test_cpp_thpool.cpp Outdated Show resolved Hide resolved
tests/cpptests/test_cpp_thpool.cpp Show resolved Hide resolved
tests/cpptests/test_cpp_thpool.cpp Show resolved Hide resolved
unify if (job_p)  in thread_do

remove unused var from sleep_and_set in test_cpp_thpool
meiravgri and others added 14 commits April 2, 2024 17:54
* add admin priority queue

*add threads manager to send admin jobs to threads

*not lazy gc init to avoid bg intialization
remove threads array from thpool struct
change thpool state to THPOOL_UNINITIALIZED in redisearch_thpool_terminate_when_empty

wait for num working threads to be 0 in redisearch_thpool_terminate_pause_threads

thpool cpp tests:
TestTerminateWhenEmpty:
test recreating the threads after temination

new test:
TestPauseResume
Copy link
Collaborator

@GuyAv46 GuyAv46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

tests/pytests/test_multithread.py Outdated Show resolved Hide resolved
src/gc.c Outdated Show resolved Hide resolved
src/gc.c Outdated Show resolved Hide resolved
@@ -19,7 +19,7 @@ int ConcurrentSearch_CreatePool(int numThreads) {
}
int poolId = array_len(threadpools_g);
threadpools_g = array_append(threadpools_g, redisearch_thpool_create(numThreads, DEFAULT_PRIVILEGED_THREADS_NUM,
LogCallback));
LogCallback, "coord"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider passing the pool name as a parameter. I would at least add a comment about why we chose this name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 rows above the array explictly initialized to size 1 with a comment "Only used by the coordinator, so 1 is enough"

deps/thpool/pthread_barrier.h Show resolved Hide resolved

/* Initialize threads if needed */
redisearch_thpool_verify_init(thpool_p);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving into priority_queue_push_chain to avoid duplications

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's the priority queue's responsibility to initialize the threads.

  1. I added a wrapper function priority_queue_push_chain_init_threads to the threadpool section
  2. priority_queue_push_chain_unsafe: replaced redisearch_thpool_t * argument that was used only to get the priority queue, with a priority_queue *

deps/thpool/thpool.c Outdated Show resolved Hide resolved
pthread_detach((*thread_p)->pthread);
static int thread_init(redisearch_thpool_t *thpool_p) {
pthread_t thread_id;
pthread_create(&thread_id, NULL, (void *)thread_do, thpool_p);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this cast needed?

deps/thpool/thpool.c Outdated Show resolved Hide resolved
rm_free(job_p);

// Both variables are atomic, so we can do this without a lock.
thpool_p->total_jobs_done += !job_ctx.is_admin;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
thpool_p->total_jobs_done += !job_ctx.is_admin;
if (!job_ctx.is_admin) thpool_p->total_jobs_done++;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the C Standard (ISO/IEC 9899:2018):

Section 6.5.3.3 Unary arithmetic operators: "The result of the logical negation operator ! is 0 if the value of its operand compares unequal to 0, 1 if the value of its operand compares equal to 0. The result has type int."

…lback, triggred by redis from the main thread

remove thpool_init. not in use

renaming of thpool structs' members

remove priority_push_chain and introduce redisearch_thpool_push_chain_init_threads that encapsulate both safe push to the queue and threads verify init

pthread_varrier: add volatile to cycle

test_multithread: replace algo, datatype loop with an automatic generation of a function for each permutation (algo, datatype) as Test_burst_threads_sanity class attribute
Thanks @GuyAv46
tests/pytests/test_multithread.py Outdated Show resolved Hide resolved
tests/pytests/test_multithread.py Outdated Show resolved Hide resolved
tests/pytests/test_multithread.py Outdated Show resolved Hide resolved
meiravgri and others added 3 commits May 27, 2024 17:00
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants