New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add virtual thread support #1299
base: main
Are you sure you want to change the base?
Conversation
40af8d8
to
a25b239
Compare
/** | ||
* Returns if the current Runtime supports virtual threads. | ||
*/ | ||
def isSupported: Boolean = create("testIsSupported") ne null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better with a lazy val, will update later.
var started = false; | ||
try { | ||
val thread = threadFactory.newThread(Task(this, command)) | ||
virtualThreads.add(thread) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic here is obviously thread-unsafe because stage.get() >= SHUTDOWN
can be reordered with virtualThreads.add(thread)
.
In other words, you can start and add new threads that won't be waited on while the thread-pool is shutting down.
I don't know if this is a concern we need to have, or how the other implementations do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, would you like to continue the work? I will continue this when find time.
I have a isStarted check beblow, that would help?
Another issue is, only execute method is been used, not sure what the origin design requires an ExecutionService.
* 1 SHUTDOWN | ||
* 2 TERMINATED | ||
*/ | ||
private val state = new AtomicInteger(RUNNING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will replace with a single field and a fieldUpdater later
This is probably useful but I would appreciate if we don't merge it until after 1.1.0-M1 is released. I think there are some changes that we need in the implementation and we have to work out how to test it. I don't think this dispatcher should be the default, even if you use JDK 21+. Users should only be able to opt in to using it. Dispatchers are pluggable so users can create their own dispatcher for this while they are waiting for us to release one. |
Yes, I would like to make it in 1.1.0 M2 or M3 , Need more time to polish it. |
@@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: | |||
def dispatcher(): MessageDispatcher | |||
|
|||
def configureExecutor(): ExecutorServiceConfigurator = { | |||
@tailrec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not look like a recursion func.
if (!isShutdown) { | ||
terminateLock.lock() | ||
try { | ||
if (isTerminated) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reduce this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fee free to fork and change on it:( I'm a little busy at work. if we remove this, we must ensure this method is only be called when we hold the lock.
Motivation:
Add virtual thread support
Modification:
Add a virtual thread executor
Result:
Virtual thread support was added.
TODO:
I think it would be nice to add the blocking io dispatcher with a virtual thread executor too.
Status:
I ran out of time, will update this when get time.