Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FISH-7670 Integerate Latest Concurro with Virtual Threads #6582

Merged
merged 16 commits into from Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/payara-bom/pom.xml
aubi marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -470,9 +470,9 @@
<version>${com.ibm.jbatch.spi.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.enterprise.concurrent</artifactId>
<version>${concurrent.version}</version>
<groupId>org.glassfish.concurro</groupId>
<artifactId>concurro</artifactId>
<version>${concurro.version}</version>
</dependency>

<dependency>
Expand Down
aubi marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -128,6 +128,21 @@ public interface ManagedExecutorService extends ConfigBeanProxy, Resource,
@DuckTyped
String getIdentity();

/**
* Gets the value of the useVirtualThreads property.
*
* @return possible object is {@link String }
*/
@Attribute(defaultValue = "false", dataType = Boolean.class)
String getUseVirtualThreads();

/**
* Sets the value of the useVirtualThreads property.
*
* @param value allowed object is {@link String }
*/
void setUseVirtualThreads(String value) throws PropertyVetoException;

aubi marked this conversation as resolved.
Show resolved Hide resolved
class Duck {
public static String getIdentity(ManagedExecutorService resource){
return resource.getJndiName();
Expand Down
4 changes: 2 additions & 2 deletions appserver/concurrent/concurrent-impl/pom.xml
aubi marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -108,8 +108,8 @@
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.enterprise.concurrent</artifactId>
<groupId>org.glassfish.concurro</groupId>
<artifactId>concurro</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
Expand Down
aubi marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -20,7 +20,7 @@
import org.glassfish.concurrent.config.ManagedExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.deployer.ManagedExecutorServiceConfig;
import org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.external.probe.provider.StatsProviderManager;
import org.glassfish.external.statistics.CountStatistic;
import org.glassfish.external.statistics.impl.CountStatisticImpl;
Expand All @@ -41,7 +41,7 @@ public class ManagedExecutorServiceStatsProvider
{
private final String name;
private boolean registered = false;
private final ManagedExecutorServiceImpl managedExecutorServiceImpl;
private final AbstractManagedExecutorService managedExecutorServiceImpl;

private CountStatisticImpl completedTaskCount = new CountStatisticImpl(
"CompletedTaskCount", "count",
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -20,7 +20,7 @@
import org.glassfish.concurrent.config.ManagedScheduledExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.deployer.ManagedScheduledExecutorServiceConfig;
import org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.external.probe.provider.StatsProviderManager;
import org.glassfish.external.statistics.CountStatistic;
import org.glassfish.external.statistics.impl.CountStatisticImpl;
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -71,13 +71,15 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.naming.NamingException;
import org.glassfish.enterprise.concurrent.AbstractManagedExecutorService;
import org.glassfish.enterprise.concurrent.AbstractManagedThread;
import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.concurro.AbstractManagedThread;
import org.glassfish.concurro.ContextServiceImpl;
import org.glassfish.concurro.ForkJoinManagedExecutorService;
import org.glassfish.concurro.ManagedExecutorServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedThreadFactoryImpl;
import org.glassfish.concurro.virtualthreads.VirtualThreadsManagedExecutorService;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.resourcebase.resources.naming.ResourceNamingService;

/**
Expand All @@ -89,7 +91,7 @@ public class ConcurrentRuntime implements PostConstruct, PreDestroy {

private static ConcurrentRuntime _runtime;

private Map<String, ManagedExecutorServiceImpl> managedExecutorServiceMap;
private Map<String, AbstractManagedExecutorService> managedExecutorServiceMap;
private Map<String, ManagedScheduledExecutorServiceImpl> managedScheduledExecutorServiceMap;
private Map<String, ContextServiceImpl> contextServiceMap = new HashMap();
private Map<String, ManagedThreadFactoryImpl> managedThreadFactoryMap;
Expand Down Expand Up @@ -203,7 +205,7 @@ public void shutdownContextService(String jndiName) {
}
}

public synchronized ManagedExecutorServiceImpl getManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config) {
public synchronized AbstractManagedExecutorService getManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config) {
String jndiName = config.getJndiName();

if (managedExecutorServiceMap != null && managedExecutorServiceMap.containsKey(jndiName)) {
Expand All @@ -215,7 +217,7 @@ public synchronized ManagedExecutorServiceImpl getManagedExecutorService(Resourc
config.getContextInfo(),
config.isContextInfoEnabledBoolean(), true);

ManagedExecutorServiceImpl mes = createManagedExecutorService(resourceInfo, config, contextService);
AbstractManagedExecutorService mes = createManagedExecutorService(resourceInfo, config, contextService);
if (managedExecutorServiceMap == null) {
managedExecutorServiceMap = new HashMap();
}
Expand All @@ -224,24 +226,45 @@ public synchronized ManagedExecutorServiceImpl getManagedExecutorService(Resourc
return mes;
}

public synchronized ManagedExecutorServiceImpl createManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config, ContextServiceImpl contextService) {
public synchronized AbstractManagedExecutorService createManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config, ContextServiceImpl contextService) {
ManagedThreadFactoryImpl managedThreadFactory = new ThreadFactoryWrapper(
config.getJndiName() + "-managedThreadFactory",
null,
config.getThreadPriority());
ManagedExecutorServiceImpl mes = new ManagedExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getUseForkJoinPool(),
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
AbstractManagedExecutorService mes;
if (config.getUseVirtualThread()) {
mes = new VirtualThreadsManagedExecutorService(config.getJndiName(),
null,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getMaximumPoolSize(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
} else if (config.getUseForkJoinPool()) {
mes = new ForkJoinManagedExecutorService(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
} else {
mes = new ManagedExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);

}
if (config.getHungAfterSeconds() > 0L && !config.isLongRunningTasks()) {
scheduleInternalTimer();
}
Expand All @@ -250,7 +273,7 @@ public synchronized ManagedExecutorServiceImpl createManagedExecutorService(Reso
}

public void shutdownManagedExecutorService(String jndiName) {
ManagedExecutorServiceImpl mes = null;
AbstractManagedExecutorService mes = null;
synchronized(this) {
if (managedExecutorServiceMap != null) {
mes = managedExecutorServiceMap.remove(jndiName);
Expand Down Expand Up @@ -288,6 +311,7 @@ public ManagedScheduledExecutorServiceImpl createManagedScheduledExecutorService
config.getJndiName() + "-managedThreadFactory",
null,
config.getThreadPriority());
// TODO: eventually use VT base MSES
ManagedScheduledExecutorServiceImpl mes = new ManagedScheduledExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1000L, // in millseconds
Expand Down Expand Up @@ -452,7 +476,7 @@ public ThreadFactoryWrapper(String string, ContextServiceImpl contextService, in
}

@Override
protected AbstractManagedThread createThread(Runnable r, ContextHandle contextHandleForSetup) {
protected Thread createThread(Runnable r, ContextHandle contextHandleForSetup) {
ClassLoader appClassLoader = Utility.getClassLoader();
Utility.setContextClassLoader(null);
try {
Expand All @@ -475,11 +499,11 @@ public void preDestroy() {
class HungTasksLogger implements Runnable {

public void run() {
ArrayList<ManagedExecutorServiceImpl> executorServices = new ArrayList();
ArrayList<AbstractManagedExecutorService> executorServices = new ArrayList();
ArrayList<ManagedScheduledExecutorServiceImpl> scheduledExecutorServices = new ArrayList();
synchronized (ConcurrentRuntime.this) {
if (managedExecutorServiceMap != null) {
Collection<ManagedExecutorServiceImpl> mesColl = managedExecutorServiceMap.values();
Collection<AbstractManagedExecutorService> mesColl = managedExecutorServiceMap.values();
executorServices.addAll(mesColl);
}
}
Expand All @@ -489,24 +513,31 @@ public void run() {
scheduledExecutorServices.addAll(msesColl);
}
}
for (ManagedExecutorServiceImpl mes: executorServices) {
Collection<AbstractManagedThread> hungThreads = mes.getHungThreads();
for (AbstractManagedExecutorService mes : executorServices) {
Collection<Thread> hungThreads = mes.getHungThreads();
logHungThreads(hungThreads, mes.getManagedThreadFactory(), mes.getName());
}
for (ManagedScheduledExecutorServiceImpl mses: scheduledExecutorServices) {
Collection<AbstractManagedThread> hungThreads = mses.getHungThreads();
Collection<Thread> hungThreads = mses.getHungThreads();
logHungThreads(hungThreads, mses.getManagedThreadFactory(), mses.getName());
}
}

private void logHungThreads(Collection<AbstractManagedThread> hungThreads, ManagedThreadFactoryImpl mtf, String mesName) {
private void logHungThreads(Collection<Thread> hungThreads, ManagedThreadFactoryImpl mtf, String mesName) {
if (hungThreads != null) {
for (AbstractManagedThread hungThread: hungThreads) {
Object[] params = {hungThread.getTaskIdentityName(),
hungThread.getName(),
hungThread.getTaskRunTime(System.currentTimeMillis()) / 1000,
mtf.getHungTaskThreshold() / 1000,
mesName};
long now = System.currentTimeMillis();
for (Thread hungThread : hungThreads) {
String taskIdentityName = "!!!!!!!";
String name = hungThread.getName();
long taskRunTime = -1;
if (hungThread instanceof AbstractManagedThread managedThread) {
taskIdentityName = managedThread.getTaskIdentityName();
taskRunTime = managedThread.getTaskRunTime(now);
}
Object[] params = {taskIdentityName, name,
taskRunTime / 1000,
mtf.getHungTaskThreshold() / 1000,
mesName};
logger.log(Level.WARNING, LogFacade.UNRESPONSIVE_TASK, params);
}
}
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -49,12 +49,11 @@
import com.sun.enterprise.security.SecurityContext;
import com.sun.enterprise.transaction.api.JavaEETransactionManager;
import com.sun.enterprise.util.Utility;
import fish.payara.opentracing.propagation.MapToTextMap;
import org.glassfish.api.invocation.ComponentInvocation;
import org.glassfish.api.invocation.InvocationManager;
import org.glassfish.concurrent.LogFacade;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.enterprise.concurrent.spi.ContextSetupProvider;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.concurro.spi.ContextSetupProvider;
import org.glassfish.internal.deployment.Deployment;

import jakarta.enterprise.concurrent.ContextService;
Expand All @@ -71,12 +70,9 @@

import fish.payara.nucleus.requesttracing.RequestTracingService;
import fish.payara.nucleus.healthcheck.stuck.StuckThreadsStore;
import fish.payara.notification.requesttracing.RequestTraceSpanContext;
import fish.payara.opentracing.OpenTracingService;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.propagation.Format;
import jakarta.enterprise.concurrent.ContextServiceDefinition;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -317,7 +313,7 @@ public ContextHandle setup(ContextHandle contextHandle) throws IllegalStateExcep
}

if (stuckThreads != null) {
stuckThreads.registerThread(Thread.currentThread().getId());
stuckThreads.registerThread(Thread.currentThread().threadId());
}

// execute thread contexts snapshots to begin
Expand Down Expand Up @@ -410,7 +406,7 @@ public void reset(ContextHandle contextHandle) {
requestTracing.endTrace();
}
if (stuckThreads != null) {
stuckThreads.deregisterThread(Thread.currentThread().getId());
stuckThreads.deregisterThread(Thread.currentThread().threadId());
}
}

Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -49,7 +49,7 @@
import jakarta.enterprise.concurrent.spi.ThreadContextRestorer;
import jakarta.enterprise.concurrent.spi.ThreadContextSnapshot;
import org.glassfish.api.invocation.ComponentInvocation;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.internal.data.ApplicationInfo;
import org.glassfish.internal.data.ApplicationRegistry;

Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -41,7 +41,7 @@
package org.glassfish.concurrent.runtime;


import org.glassfish.enterprise.concurrent.spi.TransactionHandle;
import org.glassfish.concurro.spi.TransactionHandle;

import jakarta.transaction.Transaction;

Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -42,8 +42,8 @@
package org.glassfish.concurrent.runtime;

import com.sun.enterprise.transaction.api.JavaEETransactionManager;
import org.glassfish.enterprise.concurrent.spi.TransactionHandle;
import org.glassfish.enterprise.concurrent.spi.TransactionSetupProvider;
import org.glassfish.concurro.spi.TransactionHandle;
import org.glassfish.concurro.spi.TransactionSetupProvider;

import jakarta.enterprise.concurrent.ManagedTask;
import jakarta.transaction.InvalidTransactionException;
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -41,8 +41,13 @@

package org.glassfish.concurrent.runtime.deployer;

import jakarta.enterprise.concurrent.ManagedExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.enterprise.concurrent.*;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.concurro.ContextServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceAdapter;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedThreadFactoryImpl;
import org.glassfish.resourcebase.resources.api.ResourceInfo;

import javax.naming.Context;
Expand Down Expand Up @@ -95,8 +100,8 @@ private ManagedThreadFactoryImpl getManagedThreadFactory(ManagedThreadFactoryCon
return managedThreadFactory;
}

private ManagedExecutorServiceAdapter getManagedExecutorService(ManagedExecutorServiceConfig config, ResourceInfo resourceInfo) {
ManagedExecutorServiceImpl mes = getRuntime().getManagedExecutorService(resourceInfo, config);
private ManagedExecutorService getManagedExecutorService(ManagedExecutorServiceConfig config, ResourceInfo resourceInfo) {
AbstractManagedExecutorService mes = getRuntime().getManagedExecutorService(resourceInfo, config);
return mes.getAdapter();
}

Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright

Expand Up @@ -54,7 +54,7 @@
import org.glassfish.api.invocation.InvocationManager;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.ContextSetupProviderImpl;
import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.concurro.ContextServiceImpl;
import org.glassfish.resourcebase.resources.api.ResourceConflictException;
import org.glassfish.resourcebase.resources.api.ResourceDeployer;
import org.glassfish.resourcebase.resources.api.ResourceDeployerInfo;
Expand Down