Skip to content

Commit

Permalink
Merge pull request #6582 from aubi/FISH-7670-use-concurro
Browse files Browse the repository at this point in the history
FISH-7670 Integerate Latest Concurro with Virtual Threads
  • Loading branch information
aubi committed Mar 11, 2024
2 parents 6cc1de1 + b3305c5 commit 6cd8eb5
Show file tree
Hide file tree
Showing 28 changed files with 224 additions and 155 deletions.
8 changes: 4 additions & 4 deletions api/payara-bom/pom.xml
@@ -1,7 +1,7 @@
<!--
~ DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
~
~ Copyright (c) 2019-2023 Payara Foundation and/or its affiliates. All rights reserved.
~ Copyright (c) 2019-2024 Payara Foundation and/or its affiliates. All rights reserved.
~
~ The contents of this file are subject to the terms of either the GNU
~ General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -470,9 +470,9 @@
<version>${com.ibm.jbatch.spi.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.enterprise.concurrent</artifactId>
<version>${concurrent.version}</version>
<groupId>org.glassfish.concurro</groupId>
<artifactId>concurro</artifactId>
<version>${concurro.version}</version>
</dependency>

<dependency>
Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2022] Payara Foundation and/or affiliates
// Portions Copyright [2022-2024] Payara Foundation and/or affiliates
package org.glassfish.concurrent.config;

import com.sun.enterprise.config.modularity.ConfigBeanInstaller;
Expand Down Expand Up @@ -125,6 +125,21 @@ public interface ManagedExecutorService extends ConfigBeanProxy, Resource,
*/
void setUseForkJoinPool(String value) throws PropertyVetoException;

/**
* Gets the value of the useVirtualThreads property.
*
* @return possible object is {@link String }
*/
@Attribute(defaultValue = "false", dataType = Boolean.class)
String getUseVirtualThreads();

/**
* Sets the value of the useVirtualThreads property.
*
* @param value allowed object is {@link String }
*/
void setUseVirtualThreads(String value) throws PropertyVetoException;

@DuckTyped
String getIdentity();

Expand Down
6 changes: 3 additions & 3 deletions appserver/concurrent/concurrent-impl/pom.xml
Expand Up @@ -41,7 +41,7 @@
-->

<!-- Portions Copyright [2016-2023] [Payara Foundation and/or its affiliates] -->
<!-- Portions Copyright [2016-2024] [Payara Foundation and/or its affiliates] -->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -108,8 +108,8 @@
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.enterprise.concurrent</artifactId>
<groupId>org.glassfish.concurro</groupId>
<artifactId>concurro</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
Expand Down
@@ -1,7 +1,7 @@
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016 Payara Foundation and/or its affiliates.
* Copyright (c) 2016-2024 Payara Foundation and/or its affiliates.
* All rights reserved.
*
* The contents of this file are subject to the terms of the Common Development
Expand All @@ -20,7 +20,7 @@
import org.glassfish.concurrent.config.ManagedExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.deployer.ManagedExecutorServiceConfig;
import org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.external.probe.provider.StatsProviderManager;
import org.glassfish.external.statistics.CountStatistic;
import org.glassfish.external.statistics.impl.CountStatisticImpl;
Expand All @@ -41,7 +41,7 @@ public class ManagedExecutorServiceStatsProvider
{
private final String name;
private boolean registered = false;
private final ManagedExecutorServiceImpl managedExecutorServiceImpl;
private final AbstractManagedExecutorService managedExecutorServiceImpl;

private CountStatisticImpl completedTaskCount = new CountStatisticImpl(
"CompletedTaskCount", "count",
Expand Down
@@ -1,7 +1,7 @@
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016 Payara Foundation and/or its affiliates.
* Copyright (c) 2016-2024 Payara Foundation and/or its affiliates.
* All rights reserved.
*
* The contents of this file are subject to the terms of the Common Development
Expand All @@ -20,7 +20,7 @@
import org.glassfish.concurrent.config.ManagedScheduledExecutorService;
import org.glassfish.concurrent.runtime.ConcurrentRuntime;
import org.glassfish.concurrent.runtime.deployer.ManagedScheduledExecutorServiceConfig;
import org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.external.probe.provider.StatsProviderManager;
import org.glassfish.external.statistics.CountStatistic;
import org.glassfish.external.statistics.impl.CountStatisticImpl;
Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2016-2022] [Payara Foundation and/or its affiliates]
// Portions Copyright [2016-2024] [Payara Foundation and/or its affiliates]

package org.glassfish.concurrent.runtime;

Expand Down Expand Up @@ -71,13 +71,15 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.naming.NamingException;
import org.glassfish.enterprise.concurrent.AbstractManagedExecutorService;
import org.glassfish.enterprise.concurrent.AbstractManagedThread;
import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.concurro.AbstractManagedExecutorService;
import org.glassfish.concurro.AbstractManagedThread;
import org.glassfish.concurro.ContextServiceImpl;
import org.glassfish.concurro.ForkJoinManagedExecutorService;
import org.glassfish.concurro.ManagedExecutorServiceImpl;
import org.glassfish.concurro.ManagedScheduledExecutorServiceImpl;
import org.glassfish.concurro.ManagedThreadFactoryImpl;
import org.glassfish.concurro.virtualthreads.VirtualThreadsManagedExecutorService;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.resourcebase.resources.naming.ResourceNamingService;

/**
Expand All @@ -89,7 +91,7 @@ public class ConcurrentRuntime implements PostConstruct, PreDestroy {

private static ConcurrentRuntime _runtime;

private Map<String, ManagedExecutorServiceImpl> managedExecutorServiceMap;
private Map<String, AbstractManagedExecutorService> managedExecutorServiceMap;
private Map<String, ManagedScheduledExecutorServiceImpl> managedScheduledExecutorServiceMap;
private Map<String, ContextServiceImpl> contextServiceMap = new HashMap();
private Map<String, ManagedThreadFactoryImpl> managedThreadFactoryMap;
Expand Down Expand Up @@ -203,7 +205,7 @@ public void shutdownContextService(String jndiName) {
}
}

public synchronized ManagedExecutorServiceImpl getManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config) {
public synchronized AbstractManagedExecutorService getManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config) {
String jndiName = config.getJndiName();

if (managedExecutorServiceMap != null && managedExecutorServiceMap.containsKey(jndiName)) {
Expand All @@ -215,7 +217,7 @@ public synchronized ManagedExecutorServiceImpl getManagedExecutorService(Resourc
config.getContextInfo(),
config.isContextInfoEnabledBoolean(), true);

ManagedExecutorServiceImpl mes = createManagedExecutorService(resourceInfo, config, contextService);
AbstractManagedExecutorService mes = createManagedExecutorService(resourceInfo, config, contextService);
if (managedExecutorServiceMap == null) {
managedExecutorServiceMap = new HashMap();
}
Expand All @@ -224,24 +226,45 @@ public synchronized ManagedExecutorServiceImpl getManagedExecutorService(Resourc
return mes;
}

public synchronized ManagedExecutorServiceImpl createManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config, ContextServiceImpl contextService) {
public synchronized AbstractManagedExecutorService createManagedExecutorService(ResourceInfo resourceInfo, ManagedExecutorServiceConfig config, ContextServiceImpl contextService) {
ManagedThreadFactoryImpl managedThreadFactory = new ThreadFactoryWrapper(
config.getJndiName() + "-managedThreadFactory",
null,
config.getThreadPriority());
ManagedExecutorServiceImpl mes = new ManagedExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getUseForkJoinPool(),
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
AbstractManagedExecutorService mes;
if (config.getUseVirtualThread()) {
mes = new VirtualThreadsManagedExecutorService(config.getJndiName(),
null,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getMaximumPoolSize(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
} else if (config.getUseForkJoinPool()) {
mes = new ForkJoinManagedExecutorService(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);
} else {
mes = new ManagedExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1_000L, // in milliseconds
config.isLongRunningTasks(),
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveSeconds(), TimeUnit.SECONDS,
config.getThreadLifeTimeSeconds(),
config.getTaskQueueCapacity(),
contextService,
AbstractManagedExecutorService.RejectPolicy.ABORT);

}
if (config.getHungAfterSeconds() > 0L && !config.isLongRunningTasks()) {
scheduleInternalTimer();
}
Expand All @@ -250,7 +273,7 @@ public synchronized ManagedExecutorServiceImpl createManagedExecutorService(Reso
}

public void shutdownManagedExecutorService(String jndiName) {
ManagedExecutorServiceImpl mes = null;
AbstractManagedExecutorService mes = null;
synchronized(this) {
if (managedExecutorServiceMap != null) {
mes = managedExecutorServiceMap.remove(jndiName);
Expand Down Expand Up @@ -288,6 +311,7 @@ public ManagedScheduledExecutorServiceImpl createManagedScheduledExecutorService
config.getJndiName() + "-managedThreadFactory",
null,
config.getThreadPriority());
// TODO: eventually use VT base MSES
ManagedScheduledExecutorServiceImpl mes = new ManagedScheduledExecutorServiceImpl(config.getJndiName(),
managedThreadFactory,
config.getHungAfterSeconds() * 1000L, // in millseconds
Expand Down Expand Up @@ -452,7 +476,7 @@ public ThreadFactoryWrapper(String string, ContextServiceImpl contextService, in
}

@Override
protected AbstractManagedThread createThread(Runnable r, ContextHandle contextHandleForSetup) {
protected Thread createThread(Runnable r, ContextHandle contextHandleForSetup) {
ClassLoader appClassLoader = Utility.getClassLoader();
Utility.setContextClassLoader(null);
try {
Expand All @@ -475,11 +499,11 @@ public void preDestroy() {
class HungTasksLogger implements Runnable {

public void run() {
ArrayList<ManagedExecutorServiceImpl> executorServices = new ArrayList();
ArrayList<AbstractManagedExecutorService> executorServices = new ArrayList();
ArrayList<ManagedScheduledExecutorServiceImpl> scheduledExecutorServices = new ArrayList();
synchronized (ConcurrentRuntime.this) {
if (managedExecutorServiceMap != null) {
Collection<ManagedExecutorServiceImpl> mesColl = managedExecutorServiceMap.values();
Collection<AbstractManagedExecutorService> mesColl = managedExecutorServiceMap.values();
executorServices.addAll(mesColl);
}
}
Expand All @@ -489,24 +513,31 @@ public void run() {
scheduledExecutorServices.addAll(msesColl);
}
}
for (ManagedExecutorServiceImpl mes: executorServices) {
Collection<AbstractManagedThread> hungThreads = mes.getHungThreads();
for (AbstractManagedExecutorService mes : executorServices) {
Collection<Thread> hungThreads = mes.getHungThreads();
logHungThreads(hungThreads, mes.getManagedThreadFactory(), mes.getName());
}
for (ManagedScheduledExecutorServiceImpl mses: scheduledExecutorServices) {
Collection<AbstractManagedThread> hungThreads = mses.getHungThreads();
Collection<Thread> hungThreads = mses.getHungThreads();
logHungThreads(hungThreads, mses.getManagedThreadFactory(), mses.getName());
}
}

private void logHungThreads(Collection<AbstractManagedThread> hungThreads, ManagedThreadFactoryImpl mtf, String mesName) {
private void logHungThreads(Collection<Thread> hungThreads, ManagedThreadFactoryImpl mtf, String mesName) {
if (hungThreads != null) {
for (AbstractManagedThread hungThread: hungThreads) {
Object[] params = {hungThread.getTaskIdentityName(),
hungThread.getName(),
hungThread.getTaskRunTime(System.currentTimeMillis()) / 1000,
mtf.getHungTaskThreshold() / 1000,
mesName};
long now = System.currentTimeMillis();
for (Thread hungThread : hungThreads) {
String taskIdentityName = "!!!!!!!";
String name = hungThread.getName();
long taskRunTime = -1;
if (hungThread instanceof AbstractManagedThread managedThread) {
taskIdentityName = managedThread.getTaskIdentityName();
taskRunTime = managedThread.getTaskRunTime(now);
}
Object[] params = {taskIdentityName, name,
taskRunTime / 1000,
mtf.getHungTaskThreshold() / 1000,
mesName};
logger.log(Level.WARNING, LogFacade.UNRESPONSIVE_TASK, params);
}
}
Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2016-2023] [Payara Foundation and/or its affiliates]
// Portions Copyright [2016-2024] [Payara Foundation and/or its affiliates]

package org.glassfish.concurrent.runtime;

Expand All @@ -49,12 +49,11 @@
import com.sun.enterprise.security.SecurityContext;
import com.sun.enterprise.transaction.api.JavaEETransactionManager;
import com.sun.enterprise.util.Utility;
import fish.payara.opentracing.propagation.MapToTextMap;
import org.glassfish.api.invocation.ComponentInvocation;
import org.glassfish.api.invocation.InvocationManager;
import org.glassfish.concurrent.LogFacade;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.enterprise.concurrent.spi.ContextSetupProvider;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.concurro.spi.ContextSetupProvider;
import org.glassfish.internal.deployment.Deployment;

import jakarta.enterprise.concurrent.ContextService;
Expand All @@ -71,12 +70,9 @@

import fish.payara.nucleus.requesttracing.RequestTracingService;
import fish.payara.nucleus.healthcheck.stuck.StuckThreadsStore;
import fish.payara.notification.requesttracing.RequestTraceSpanContext;
import fish.payara.opentracing.OpenTracingService;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.propagation.Format;
import jakarta.enterprise.concurrent.ContextServiceDefinition;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -317,7 +313,7 @@ public ContextHandle setup(ContextHandle contextHandle) throws IllegalStateExcep
}

if (stuckThreads != null) {
stuckThreads.registerThread(Thread.currentThread().getId());
stuckThreads.registerThread(Thread.currentThread().threadId());
}

// execute thread contexts snapshots to begin
Expand Down Expand Up @@ -410,7 +406,7 @@ public void reset(ContextHandle contextHandle) {
requestTracing.endTrace();
}
if (stuckThreads != null) {
stuckThreads.deregisterThread(Thread.currentThread().getId());
stuckThreads.deregisterThread(Thread.currentThread().threadId());
}
}

Expand Down
Expand Up @@ -37,7 +37,7 @@
* only if the new code is made subject to such option by the copyright
* holder.
*/
// Portions Copyright [2018-2023] [Payara Foundation and/or its affiliates.]
// Portions Copyright [2018-2024] [Payara Foundation and/or its affiliates.]

package org.glassfish.concurrent.runtime;

Expand All @@ -49,7 +49,7 @@
import jakarta.enterprise.concurrent.spi.ThreadContextRestorer;
import jakarta.enterprise.concurrent.spi.ThreadContextSnapshot;
import org.glassfish.api.invocation.ComponentInvocation;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
import org.glassfish.concurro.spi.ContextHandle;
import org.glassfish.internal.data.ApplicationInfo;
import org.glassfish.internal.data.ApplicationRegistry;

Expand Down

0 comments on commit 6cd8eb5

Please sign in to comment.