Skip to content

Commit

Permalink
Merge pull request #3591 from durban/workerThreadName
Browse files Browse the repository at this point in the history
Fix worker thread name index
  • Loading branch information
djspiewak committed May 3, 2023
2 parents 0292fc6 + 7723cf8 commit 7cd4523
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private final class WorkerThread(
private val indexTransfer: LinkedTransferQueue[Integer] = new LinkedTransferQueue()
private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration

val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.incrementAndGet()
val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.getAndIncrement()

// Constructor code.
{
Expand Down Expand Up @@ -668,6 +668,9 @@ private final class WorkerThread(
val idx = index
val clone =
new WorkerThread(idx, queue, parked, external, fiberBag, pool)
// Make sure the clone gets our old name:
val clonePrefix = pool.threadPrefix
clone.setName(s"$clonePrefix-$idx")
pool.replaceWorker(idx, clone)
pool.blockedWorkerThreadCounter.incrementAndGet()
clone.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,29 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances {
"WorkerThread" should {
"rename itself when entering and exiting blocking region" in real {
for {
_ <- IO.cede
computeThread <- threadInfo
(computeThreadName, _) = computeThread
blockerThread <- IO.blocking(threadInfo).flatten
(blockerThreadName, blockerThreadId) = blockerThread
_ <- IO.cede
// The new worker (which replaced the thread which became a blocker) should also have a correct name
newComputeThread <- threadInfo
(newComputeThreadName, _) = newComputeThread
// Force the previously blocking thread to become a compute thread by converting
// the pool of compute threads (size=1) to blocker threads
resetComputeThreads <- List.fill(2)(threadInfo <* IO.blocking(())).parSequence
} yield {
// Start with the regular prefix
computeThreadName must startWith("io-compute")
// Correct WSTP index (threadCount is 1, so the only possible index is 0)
computeThreadName must endWith("-0")
// Check that entering a blocking region changes the name
blockerThreadName must startWith("io-blocker")
// Check that the replacement compute thread has correct name
newComputeThreadName must startWith("io-compute")
// And index
newComputeThreadName must endWith("-0")
// Check that the same thread is renamed again when it is readded to the compute pool
val resetBlockerThread = resetComputeThreads.collectFirst {
case (name, `blockerThreadId`) => name
Expand All @@ -75,6 +85,8 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances {
"blocker thread not found after reset")
resetBlockerThread must beSome((_: String).startsWith("io-compute"))
.setMessage("blocker thread name was not reset")
resetBlockerThread must beSome((_: String).endsWith("-0"))
.setMessage("blocker thread index was not correct")
}
}
}
Expand Down

0 comments on commit 7cd4523

Please sign in to comment.