Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FISH-7670 Integerate Latest Concurro with Virtual Threads #6582

Merged
merged 16 commits into from Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/payara-bom/pom.xml
@@ -1,7 +1,7 @@
<!--
~ DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
~
~ Copyright (c) 2019-2023 Payara Foundation and/or its affiliates. All rights reserved.
~ Copyright (c) 2019-2024 Payara Foundation and/or its affiliates. All rights reserved.
~
~ The contents of this file are subject to the terms of either the GNU
~ General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -470,9 +470,9 @@
<version>${com.ibm.jbatch.spi.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.enterprise.concurrent</artifactId>
<version>${concurrent.version}</version>
<groupId>org.glassfish.concurro</groupId>
<artifactId>concurro</artifactId>
<version>${concurro.version}</version>
</dependency>

<dependency>
Expand Down
aubi marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2022] Payara Foundation and/or affiliates
// Portions Copyright [2022-2024] Payara Foundation and/or affiliates
package org.glassfish.concurrent.config;

import com.sun.enterprise.config.modularity.ConfigBeanInstaller;
Expand Down Expand Up @@ -125,6 +125,21 @@ public interface ManagedExecutorService extends ConfigBeanProxy, Resource,
*/
void setUseForkJoinPool(String value) throws PropertyVetoException;

/**
* Gets the value of the useVirtualThreads property.
*
* @return possible object is {@link String }
*/
@Attribute(defaultValue = "false", dataType = Boolean.class)
String getUseVirtualThreads();

/**
* Sets the value of the useVirtualThreads property.
*
* @param value allowed object is {@link String }
*/
void setUseVirtualThreads(String value) throws PropertyVetoException;

@DuckTyped
String getIdentity();

Expand Down
6 changes: 3 additions & 3 deletions appserver/concurrent/concurrent-impl/pom.xml
Expand Up @@ -41,7 +41,7 @@

-->

<!-- Portions Copyright [2016-2023] [Payara Foundation and/or its affiliates] -->
<!-- Portions Copyright [2016-2024] [Payara Foundation and/or its affiliates] -->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -108,8 +108,8 @@
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.enterprise.concurrent</artifactId>
<groupId>org.glassfish.concurro</groupId>
<artifactId>concurro</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
Expand Down
@@ -1,7 +1,7 @@
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016 Payara Foundation and/or its affiliates.
* Copyright (c) 2016-2024 Payara Foundation and/or its affiliates.
* All rights reserved.
*
* The contents of this file are subject to the terms of the Common Development
Expand All @@ -20,7 +20,7 @@
import org.glassfish.concurrent.config.ManagedExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.deployer.ManagedExecutorServiceConfig;
import org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.external.probe.provider.StatsProviderManager;
import org.glassfish.external.statistics.CountStatistic;
import org.glassfish.external.statistics.impl.CountStatisticImpl;
Expand All @@ -41,7 +41,7 @@ public class ManagedExecutorServiceStatsProvider
{
private final String name;
private boolean registered = false;
private final ManagedExecutorServiceImpl managedExecutorServiceImpl;
private final AbstractManagedExecutorService managedExecutorServiceImpl;

private CountStatisticImpl completedTaskCount = new CountStatisticImpl(
"CompletedTaskCount", "count",
Expand Down
@@ -1,7 +1,7 @@
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016 Payara Foundation and/or its affiliates.
* Copyright (c) 2016-2024 Payara Foundation and/or its affiliates.
* All rights reserved.
*
* The contents of this file are subject to the terms of the Common Development
Expand All @@ -20,7 +20,7 @@
import org.glassfish.concurrent.config.ManagedScheduledExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.deployer.ManagedScheduledExecutorServiceConfig;
import org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.external.probe.provider.StatsProviderManager;
import org.glassfish.external.statistics.CountStatistic;
import org.glassfish.external.statistics.impl.CountStatisticImpl;
Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2016-2022] [Payara Foundation and/or its affiliates]
// Portions Copyright [2016-2024] [Payara Foundation and/or its affiliates]

package org.glassfish.concurrent.runtime;

Expand Down Expand Up @@ -71,13 +71,15 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.naming.NamingException;
import org.glassfish.enterprise.concurrent.AbstractManagedExecutorService;
import org.glassfish.enterprise.concurrent.AbstractManagedThread;
import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.concurro.AbstractManagedThread;
import org.glassfish.concurro.ContextServiceImpl;
import org.glassfish.concurro.ForkJoinManagedExecutorService;
import org.glassfish.concurro.ManagedExecutorServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedThreadFactoryImpl;
import org.glassfish.concurro.virtualthreads.VirtualThreadsManagedExecutorService;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.resourcebase.resources.naming.ResourceNamingService;

/**
Expand All @@ -89,7 +91,7 @@ public class ConcurrentRuntime implements PostConstruct, PreDestroy {

private static ConcurrentRuntime _runtime;

private Map<String, ManagedExecutorServiceImpl> managedExecutorServiceMap;
private Map<String, AbstractManagedExecutorService> managedExecutorServiceMap;
private Map<String, ManagedScheduledExecutorServiceImpl> managedScheduledExecutorServiceMap;
private Map<String, ContextServiceImpl> contextServiceMap = new HashMap();
private Map<String, ManagedThreadFactoryImpl> managedThreadFactoryMap;
Expand Down Expand Up @@ -203,7 +205,7 @@ public void shutdownContextService(String jndiName) {
}
}

public synchronized ManagedExecutorServiceImpl getManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config) {
public synchronized AbstractManagedExecutorService getManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config) {
String jndiName = config.getJndiName();

if (managedExecutorServiceMap != null && managedExecutorServiceMap.containsKey(jndiName)) {
Expand All @@ -215,7 +217,7 @@ public synchronized ManagedExecutorServiceImpl getManagedExecutorService(Resourc
config.getContextInfo(),
config.isContextInfoEnabledBoolean(), true);

ManagedExecutorServiceImpl mes = createManagedExecutorService(resourceInfo, config, contextService);
AbstractManagedExecutorService mes = createManagedExecutorService(resourceInfo, config, contextService);
if (managedExecutorServiceMap == null) {
managedExecutorServiceMap = new HashMap();
}
Expand All @@ -224,24 +226,45 @@ public synchronized ManagedExecutorServiceImpl getManagedExecutorService(Resourc
return mes;
}

public synchronized ManagedExecutorServiceImpl createManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config, ContextServiceImpl contextService) {
public synchronized AbstractManagedExecutorService createManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config, ContextServiceImpl contextService) {
ManagedThreadFactoryImpl managedThreadFactory = new ThreadFactoryWrapper(
config.getJndiName() + "-managedThreadFactory",
null,
config.getThreadPriority());
ManagedExecutorServiceImpl mes = new ManagedExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getUseForkJoinPool(),
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
AbstractManagedExecutorService mes;
if (config.getUseVirtualThread()) {
mes = new VirtualThreadsManagedExecutorService(config.getJndiName(),
null,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getMaximumPoolSize(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
} else if (config.getUseForkJoinPool()) {
mes = new ForkJoinManagedExecutorService(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
} else {
mes = new ManagedExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);

}
if (config.getHungAfterSeconds() > 0L && !config.isLongRunningTasks()) {
scheduleInternalTimer();
}
Expand All @@ -250,7 +273,7 @@ public synchronized ManagedExecutorServiceImpl createManagedExecutorService(Reso
}

public void shutdownManagedExecutorService(String jndiName) {
ManagedExecutorServiceImpl mes = null;
AbstractManagedExecutorService mes = null;
synchronized(this) {
if (managedExecutorServiceMap != null) {
mes = managedExecutorServiceMap.remove(jndiName);
Expand Down Expand Up @@ -288,6 +311,7 @@ public ManagedScheduledExecutorServiceImpl createManagedScheduledExecutorService
config.getJndiName() + "-managedThreadFactory",
null,
config.getThreadPriority());
// TODO: eventually use VT base MSES
ManagedScheduledExecutorServiceImpl mes = new ManagedScheduledExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1000L, // in millseconds
Expand Down Expand Up @@ -452,7 +476,7 @@ public ThreadFactoryWrapper(String string, ContextServiceImpl contextService, in
}

@Override
protected AbstractManagedThread createThread(Runnable r, ContextHandle contextHandleForSetup) {
protected Thread createThread(Runnable r, ContextHandle contextHandleForSetup) {
ClassLoader appClassLoader = Utility.getClassLoader();
Utility.setContextClassLoader(null);
try {
Expand All @@ -475,11 +499,11 @@ public void preDestroy() {
class HungTasksLogger implements Runnable {

public void run() {
ArrayList<ManagedExecutorServiceImpl> executorServices = new ArrayList();
ArrayList<AbstractManagedExecutorService> executorServices = new ArrayList();
ArrayList<ManagedScheduledExecutorServiceImpl> scheduledExecutorServices = new ArrayList();
synchronized (ConcurrentRuntime.this) {
if (managedExecutorServiceMap != null) {
Collection<ManagedExecutorServiceImpl> mesColl = managedExecutorServiceMap.values();
Collection<AbstractManagedExecutorService> mesColl = managedExecutorServiceMap.values();
executorServices.addAll(mesColl);
}
}
Expand All @@ -489,24 +513,31 @@ public void run() {
scheduledExecutorServices.addAll(msesColl);
}
}
for (ManagedExecutorServiceImpl mes: executorServices) {
Collection<AbstractManagedThread> hungThreads = mes.getHungThreads();
for (AbstractManagedExecutorService mes : executorServices) {
Collection<Thread> hungThreads = mes.getHungThreads();
logHungThreads(hungThreads, mes.getManagedThreadFactory(), mes.getName());
}
for (ManagedScheduledExecutorServiceImpl mses: scheduledExecutorServices) {
Collection<AbstractManagedThread> hungThreads = mses.getHungThreads();
Collection<Thread> hungThreads = mses.getHungThreads();
logHungThreads(hungThreads, mses.getManagedThreadFactory(), mses.getName());
}
}

private void logHungThreads(Collection<AbstractManagedThread> hungThreads, ManagedThreadFactoryImpl mtf, String mesName) {
private void logHungThreads(Collection<Thread> hungThreads, ManagedThreadFactoryImpl mtf, String mesName) {
if (hungThreads != null) {
for (AbstractManagedThread hungThread: hungThreads) {
Object[] params = {hungThread.getTaskIdentityName(),
hungThread.getName(),
hungThread.getTaskRunTime(System.currentTimeMillis()) / 1000,
mtf.getHungTaskThreshold() / 1000,
mesName};
long now = System.currentTimeMillis();
for (Thread hungThread : hungThreads) {
String taskIdentityName = "!!!!!!!";
String name = hungThread.getName();
long taskRunTime = -1;
if (hungThread instanceof AbstractManagedThread managedThread) {
taskIdentityName = managedThread.getTaskIdentityName();
taskRunTime = managedThread.getTaskRunTime(now);
}
Object[] params = {taskIdentityName, name,
taskRunTime / 1000,
mtf.getHungTaskThreshold() / 1000,
mesName};
logger.log(Level.WARNING, LogFacade.UNRESPONSIVE_TASK, params);
}
}
Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2016-2023] [Payara Foundation and/or its affiliates]
// Portions Copyright [2016-2024] [Payara Foundation and/or its affiliates]

package org.glassfish.concurrent.runtime;

Expand All @@ -49,12 +49,11 @@
import com.sun.enterprise.security.SecurityContext;
import com.sun.enterprise.transaction.api.JavaEETransactionManager;
import com.sun.enterprise.util.Utility;
import fish.payara.opentracing.propagation.MapToTextMap;
import org.glassfish.api.invocation.ComponentInvocation;
import org.glassfish.api.invocation.InvocationManager;
import org.glassfish.concurrent.LogFacade;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.enterprise.concurrent.spi.ContextSetupProvider;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.concurro.spi.ContextSetupProvider;
import org.glassfish.internal.deployment.Deployment;

import jakarta.enterprise.concurrent.ContextService;
Expand All @@ -71,12 +70,9 @@

import fish.payara.nucleus.requesttracing.RequestTracingService;
import fish.payara.nucleus.healthcheck.stuck.StuckThreadsStore;
import fish.payara.notification.requesttracing.RequestTraceSpanContext;
import fish.payara.opentracing.OpenTracingService;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.propagation.Format;
import jakarta.enterprise.concurrent.ContextServiceDefinition;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -317,7 +313,7 @@ public ContextHandle setup(ContextHandle contextHandle) throws IllegalStateExcep
}

if (stuckThreads != null) {
stuckThreads.registerThread(Thread.currentThread().getId());
stuckThreads.registerThread(Thread.currentThread().threadId());
}

// execute thread contexts snapshots to begin
Expand Down Expand Up @@ -410,7 +406,7 @@ public void reset(ContextHandle contextHandle) {
requestTracing.endTrace();
}
if (stuckThreads != null) {
stuckThreads.deregisterThread(Thread.currentThread().getId());
stuckThreads.deregisterThread(Thread.currentThread().threadId());
}
}

Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2018-2023] [Payara Foundation and/or its affiliates.]
// Portions Copyright [2018-2024] [Payara Foundation and/or its affiliates.]

package org.glassfish.concurrent.runtime;

Expand All @@ -49,7 +49,7 @@
import jakarta.enterprise.concurrent.spi.ThreadContextRestorer;
import jakarta.enterprise.concurrent.spi.ThreadContextSnapshot;
import org.glassfish.api.invocation.ComponentInvocation;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.internal.data.ApplicationInfo;
import org.glassfish.internal.data.ApplicationRegistry;

Expand Down