Skip to content

Commit

Permalink
Merge branch 'virtual-thread' into virtual-thread-spring-boot3
Browse files Browse the repository at this point in the history
  • Loading branch information
EachannChan committed Nov 18, 2024
2 parents dc1a4bb + a812aff commit a96658e
Show file tree
Hide file tree
Showing 40 changed files with 673 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.entity.NotifyPlatform;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.common.event.CustomContextRefreshedEvent;
Expand Down Expand Up @@ -109,13 +109,13 @@ public Map<String, ExecutorWrapper> getExecutorWrappers() {
* @return thead pools stats
*/
@Override
public List<ThreadPoolStats> getMultiPoolStats() {
public List<ExecutorStats> getMultiPoolStats() {
val executorWrappers = getExecutorWrappers();
if (MapUtils.isEmpty(executorWrappers)) {
return Collections.emptyList();
}

List<ThreadPoolStats> threadPoolStats = Lists.newArrayList();
List<ExecutorStats> threadPoolStats = Lists.newArrayList();
executorWrappers.forEach((k, v) -> threadPoolStats.add(ExecutorConverter.toMetrics(v)));
return threadPoolStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public enum JreEnum {

JAVA_18,

JAVA_19;
JAVA_19,

JAVA_20,

JAVA_21;

private static final JreEnum VERSION = getJre();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,28 @@
**/
@Data
@EqualsAndHashCode(callSuper = true)
public class ThreadPoolStats extends Metrics {
public class ExecutorStats extends Metrics {

/**
* 执行器名字
*/
private String executorName;

/**
* 执行器别名
*/
private String executorAliasName;

/**
* 线程池名字
*/
@Deprecated
private String poolName;

/**
* 线程池别名
*/
@Deprecated
private String poolAliasName;

/**
Expand All @@ -51,6 +63,31 @@ public class ThreadPoolStats extends Metrics {
*/
private int maximumPoolSize;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 是否为DtpExecutor
*/
private boolean dynamic;

/**
* 是否为虚拟线程执行器
*/
private boolean isVirtualExecutor;

/**
* 空闲时间 (ms)
*/
Expand Down Expand Up @@ -81,16 +118,6 @@ public class ThreadPoolStats extends Metrics {
*/
private int queueRemainingCapacity;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 已执行完成的大致任务总数
*/
Expand Down Expand Up @@ -121,16 +148,6 @@ public class ThreadPoolStats extends Metrics {
*/
private String rejectHandlerName;

/**
* 是否DtpExecutor线程池
*/
private boolean dynamic;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 在队列等待超时任务数量
*/
Expand Down Expand Up @@ -185,4 +202,5 @@ public class ThreadPoolStats extends Metrics {
* 满足99.9%的任务执行所需的最低耗时
*/
private double tp999;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
**/
@Data
public class Metrics {

}
17 changes: 9 additions & 8 deletions core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
doRefresh(executorWrapper, props);
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
if (oldFields.equals(newFields)) {
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) {
log.debug("DynamicTp refresh, main properties of [{}] have not changed.",
executorWrapper.getThreadPoolName());
return;
Expand Down Expand Up @@ -242,13 +242,14 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
}

ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
if (!executorWrapper.isVirtualThreadExecutor()) {
ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
}
}

List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.dromara.dynamictp.core.aware;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;

import java.util.Collections;
import java.util.List;
Expand All @@ -35,7 +35,7 @@ public interface MetricsAware extends DtpAware {
*
* @return the thread pool stats
*/
default ThreadPoolStats getPoolStats() {
default ExecutorStats getPoolStats() {
return null;
}

Expand All @@ -44,7 +44,7 @@ default ThreadPoolStats getPoolStats() {
*
* @return thead pools stats
*/
default List<ThreadPoolStats> getMultiPoolStats() {
default List<ExecutorStats> getMultiPoolStats() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.dromara.dynamictp.core.converter;

import lombok.val;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
Expand Down Expand Up @@ -53,37 +53,43 @@ public static TpMainFields toMainFields(ExecutorWrapper executorWrapper) {
return mainFields;
}

public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
ExecutorAdapter<?> executor = wrapper.getExecutor();
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
ThreadPoolStats poolStats = convertCommon(executor);
poolStats.setPoolName(wrapper.getThreadPoolName());
poolStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
poolStats.setRunTimeoutCount(provider.getRunTimeoutCount());
poolStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
poolStats.setRejectCount(provider.getRejectedTaskCount());
poolStats.setDynamic(executor instanceof DtpExecutor);
ExecutorStats executorStats = convertCommon(executor);
executorStats.setPoolName(wrapper.getThreadPoolName());
executorStats.setPoolAliasName(wrapper.getThreadPoolAliasName());

poolStats.setTps(performanceSnapshot.getTps());
poolStats.setAvg(performanceSnapshot.getAvg());
poolStats.setMaxRt(performanceSnapshot.getMaxRt());
poolStats.setMinRt(performanceSnapshot.getMinRt());
poolStats.setTp50(performanceSnapshot.getTp50());
poolStats.setTp75(performanceSnapshot.getTp75());
poolStats.setTp90(performanceSnapshot.getTp90());
poolStats.setTp95(performanceSnapshot.getTp95());
poolStats.setTp99(performanceSnapshot.getTp99());
poolStats.setTp999(performanceSnapshot.getTp999());
return poolStats;
if (!wrapper.isVirtualThreadExecutor()) {
executorStats.setRunTimeoutCount(provider.getRunTimeoutCount());
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());
executorStats.setVirtualExecutor(false);
} else {
executorStats.setVirtualExecutor(true);
}

executorStats.setDynamic(executor instanceof DtpExecutor);
executorStats.setTps(performanceSnapshot.getTps());
executorStats.setAvg(performanceSnapshot.getAvg());
executorStats.setMaxRt(performanceSnapshot.getMaxRt());
executorStats.setMinRt(performanceSnapshot.getMinRt());
executorStats.setTp50(performanceSnapshot.getTp50());
executorStats.setTp75(performanceSnapshot.getTp75());
executorStats.setTp90(performanceSnapshot.getTp90());
executorStats.setTp95(performanceSnapshot.getTp95());
executorStats.setTp99(performanceSnapshot.getTp99());
executorStats.setTp999(performanceSnapshot.getTp999());
return executorStats;
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) {
ExecutorStats poolStats = new ExecutorStats();
poolStats.setCorePoolSize(executor.getCorePoolSize());
poolStats.setMaximumPoolSize(executor.getMaximumPoolSize());
poolStats.setPoolSize(executor.getPoolSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import lombok.Getter;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;

/**
* ExecutorType related
Expand Down Expand Up @@ -55,7 +56,12 @@ public enum ExecutorType {
/**
* Priority executor type.
*/
PRIORITY("priority", PriorityDtpExecutor.class);
PRIORITY("priority", PriorityDtpExecutor.class),

/**
* Virtual thread executor adapter type.
*/
VIRTUAL("virtual", VirtualThreadExecutorProxy.class);

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.core.monitor.collector.InternalLogCollector;
import org.dromara.dynamictp.core.monitor.collector.LogCollector;
Expand Down Expand Up @@ -56,14 +56,14 @@ private CollectorHandler() {
COLLECTORS.put(jmxCollector.type(), jmxCollector);
}

public void collect(ThreadPoolStats poolStats, List<String> types) {
if (poolStats == null || CollectionUtils.isEmpty(types)) {
public void collect(ExecutorStats executorStats, List<String> types) {
if (executorStats == null || CollectionUtils.isEmpty(types)) {
return;
}
for (String collectorType : types) {
MetricsCollector collector = COLLECTORS.get(collectorType.toLowerCase());
if (collector != null) {
collector.collect(poolStats);
collector.collect(executorStats);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.event.AlarmCheckEvent;
import org.dromara.dynamictp.common.event.CollectEvent;
import org.dromara.dynamictp.common.event.CustomContextRefreshedEvent;
import org.dromara.dynamictp.common.manager.EventBusManager;

import org.dromara.dynamictp.common.properties.DtpProperties;

import org.dromara.dynamictp.core.DtpRegistry;
import org.dromara.dynamictp.core.converter.ExecutorConverter;
import org.dromara.dynamictp.core.handler.CollectorHandler;
Expand Down Expand Up @@ -103,7 +101,8 @@ private void collectMetrics(Set<String> executorNames) {
publishCollectEvent();
}

private void doCollect(ThreadPoolStats threadPoolStats) {

private void doCollect(ExecutorStats threadPoolStats) {
try {
CollectorHandler.getInstance().collect(threadPoolStats, dtpProperties.getCollectorTypes());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.dromara.dynamictp.core.monitor.collector;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,10 +29,11 @@
public class InternalLogCollector extends AbstractCollector {

@Override
public void collect(ThreadPoolStats poolStats) {
public void collect(ExecutorStats poolStats) {
log.info("dynamic.tp metrics: {}", JsonUtil.toJson(poolStats));
}


@Override
public String type() {
return CollectorTypeEnum.INTERNAL_LOGGING.name().toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.dromara.dynamictp.core.monitor.collector;

import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.util.JsonUtil;
import org.dromara.dynamictp.logging.LogHelper;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -33,7 +33,7 @@
public class LogCollector extends AbstractCollector {

@Override
public void collect(ThreadPoolStats threadPoolStats) {
public void collect(ExecutorStats threadPoolStats) {
String metrics = JsonUtil.toJson(threadPoolStats);
if (LogHelper.getMonitorLogger() == null) {
log.error("Cannot find monitor logger...");
Expand Down
Loading

0 comments on commit a96658e

Please sign in to comment.