-
Notifications
You must be signed in to change notification settings - Fork 508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MOD-6572 replace semaphore with wait in jobq #4446
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4446 +/- ##
==========================================
- Coverage 86.25% 86.25% -0.01%
==========================================
Files 190 190
Lines 34822 34822
==========================================
- Hits 30036 30035 -1
- Misses 4786 4787 +1 ☔ View full report in Codecov by Sentry. |
wait and drain are modified accordingly.
sembm om dim - 768
rename priority_queue->num_threads_working to num_threads_not_idle uncomment LOG remove script
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NICE!
few small comments
deps/thpool/thpool.c
Outdated
@@ -86,15 +81,11 @@ typedef enum { | |||
typedef struct redisearch_thpool_t { | |||
thread** threads; /* pointer to threads */ | |||
size_t total_threads_count; | |||
volatile size_t num_threads_alive; /* threads currently alive */ | |||
volatile size_t num_threads_working; /* threads currently working */ | |||
volatile atomic_size_t num_threads_alive; /* threads currently alive */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that atomic type implicitly means volatile as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not as far as i know, those are different concepts.
Do you have a reference that claims something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are different you are right, but do we need both? Isn't atomic enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this variable is accessed and modified across different threads, yes.
Having said that, should_run
should be volatile as well.
deps/thpool/thpool.c
Outdated
LOG_IF_EXISTS("verbose", "Job queue is empty - terminating thread %d", thread_p->id); | ||
thpool_p->state = THPOOL_UNINITIALIZED; | ||
thpool_p->jobqueue.should_run = false; | ||
pthread_cond_broadcast(&thpool_p->jobqueue.has_jobs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making sure, in THPOOL_TERMINATE_WHEN_EMPTY
mode, all threads will come here eventually, right? (meaning pthread_cond_broadcast
is called multiple times)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n_threads
in the worst case.
Here we also change the thpool state so if we are lucky some threads will finish their jobs before the jobq is empty, try to continue to the next iteration (skipping the broadcast), and then a different thread will change the state and the first one will not enter the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Consider elaborating on the documentation in this sensitive place
unify if (job_p) in thread_do remove unused var from sleep_and_set in test_cpp_thpool
* add admin priority queue *add threads manager to send admin jobs to threads *not lazy gc init to avoid bg intialization
remove threads array from thpool struct
change thpool state to THPOOL_UNINITIALIZED in redisearch_thpool_terminate_when_empty wait for num working threads to be 0 in redisearch_thpool_terminate_pause_threads thpool cpp tests: TestTerminateWhenEmpty: test recreating the threads after temination new test: TestPauseResume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
@@ -19,7 +19,7 @@ int ConcurrentSearch_CreatePool(int numThreads) { | |||
} | |||
int poolId = array_len(threadpools_g); | |||
threadpools_g = array_append(threadpools_g, redisearch_thpool_create(numThreads, DEFAULT_PRIVILEGED_THREADS_NUM, | |||
LogCallback)); | |||
LogCallback, "coord")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the pool name as a parameter. I would at least add a comment about why we chose this name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 rows above the array explictly initialized to size 1 with a comment "Only used by the coordinator, so 1 is enough"
deps/thpool/thpool.c
Outdated
|
||
/* Initialize threads if needed */ | ||
redisearch_thpool_verify_init(thpool_p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving into priority_queue_push_chain
to avoid duplications
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's the priority queue's responsibility to initialize the threads.
- I added a wrapper function
priority_queue_push_chain_init_threads
to the threadpool section priority_queue_push_chain_unsafe
: replacedredisearch_thpool_t *
argument that was used only to get the priority queue, with apriority_queue *
pthread_detach((*thread_p)->pthread); | ||
static int thread_init(redisearch_thpool_t *thpool_p) { | ||
pthread_t thread_id; | ||
pthread_create(&thread_id, NULL, (void *)thread_do, thpool_p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this cast needed?
rm_free(job_p); | ||
|
||
// Both variables are atomic, so we can do this without a lock. | ||
thpool_p->total_jobs_done += !job_ctx.is_admin; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thpool_p->total_jobs_done += !job_ctx.is_admin; | |
if (!job_ctx.is_admin) thpool_p->total_jobs_done++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the C Standard (ISO/IEC 9899:2018):
Section 6.5.3.3 Unary arithmetic operators: "The result of the logical negation operator ! is 0 if the value of its operand compares unequal to 0, 1 if the value of its operand compares equal to 0. The result has type int."
…lback, triggred by redis from the main thread remove thpool_init. not in use renaming of thpool structs' members remove priority_push_chain and introduce redisearch_thpool_push_chain_init_threads that encapsulate both safe push to the queue and threads verify init pthread_varrier: add volatile to cycle test_multithread: replace algo, datatype loop with an automatic generation of a function for each permutation (algo, datatype) as Test_burst_threads_sanity class attribute Thanks @GuyAv46
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
This PR significantly modifies the threadpool's pulling mechanism, setting the stage for more flexible runtime configurations.
Changing pulling synchronization machanism
Previously, the threadpool was synchronized using a semaphore-based waiting mechanism for job retrieval. This PR replaces that mechanism with a condition variable.
The revised job pulling mechanism operates as follows:
2. Threads enter a waiting state on a condition variable when the job queue is both empty and running.
3. Upon adding a new job to the queue:
Configure threadpool at runtime
Threadpool state
can be either
initialized
- hasn_threads
valid and ready to pull threads,or
uninitialized
- some or all of the threads may have exited.Threadpool state is set to
uninitialized
whenterminate_when_empty
ordestroy
are called.Removed
threads_all_idle
condition variabledrain
andwait
are internally implemented with a busy wait.calling
wait()
is equivalent to callingdrain(threshold = 0, yieldCB = nullptr)
Thread state
A thread can be in one of three states:
running
,terminate_when_empty
, ordead
.The thread state can be configured by pushing a
change_state_job
to the admin priority queue.Pause and resume
When pausing the threadpool, the jobq state is changed to
paused
. pause function will return when there are no more jobs in progress (i.enum_working_threads
== 0)Intialization
The new design assumes that the threadpool is initialized by the main thread.
All threadpool initialization is lazy and occurs upon the first push to the queue.
Since the GC pushes to the jobq from the bg, it breaks this assumption. Hence, the GC thpool initialized upon module startup.
Auestethic changes
Add name to the threadpool
The threadpool name is used to set the thepool's threads names.
a thread name is
<thpool_name>-<rand_id>.
As the maximum number of threads per thpool is 10,000,
rand_id
is a random number between 0 to 9,999.Remove
thpool->threads
arrayThis array was unnecessary and has been removed.