Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #483] Virtual thread compatible #508

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f5f25f5
virtual thread兼容,通过VirtualThreadExecutorAdapter进行声明
EachannChan Oct 14, 2024
cc83c79
bean instanceof ThreadContainer && bean instanceof ExecutorService 的…
EachannChan Oct 14, 2024
0d0c160
使用Executors.newVirtualThreadPerTaskExecutor()进行声明
EachannChan Oct 15, 2024
74f64c6
format
EachannChan Oct 15, 2024
17a913b
以注释的方式来声明executor的实现
EachannChan Oct 16, 2024
706d887
动态配置的实现思路
EachannChan Oct 16, 2024
f05dd26
proxy的重构,这样写Bean无法注册,耦合性也高
EachannChan Oct 16, 2024
a85826c
移除jdk21特性使之兼容老版本
EachannChan Oct 21, 2024
4a93554
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Oct 21, 2024
4495be0
fix
EachannChan Oct 21, 2024
a729e11
配置中心进行配置,可以获取到executor并执行但spring没有感知到此bean,即postProcessAfterInitializ…
EachannChan Oct 21, 2024
6a1e1a9
解决配置中心配置VT未被登记的问题
EachannChan Nov 4, 2024
3755c8b
doc
EachannChan Nov 4, 2024
db44cd8
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 4, 2024
0a38447
监控实现思路
EachannChan Nov 4, 2024
f4b5804
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 4, 2024
c791fd2
监控模块部分实现,VTExecutorStats为期望能获取到的数据
EachannChan Nov 5, 2024
ebc5e0e
监控基本实现,留了一些保留字段
EachannChan Nov 7, 2024
e29ebd4
complete EsCollector,format
EachannChan Nov 13, 2024
554f3f1
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 13, 2024
3dc7ee6
fix conflict and adapt
EachannChan Nov 13, 2024
0b18056
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 13, 2024
4154092
fix and format
EachannChan Nov 13, 2024
d9b5d4d
fix dependency
EachannChan Nov 13, 2024
119d2e2
update mvn jdk
EachannChan Nov 13, 2024
7944b9b
try fix gitgub err
EachannChan Nov 13, 2024
33a4214
del
EachannChan Nov 13, 2024
fe40c63
format
EachannChan Nov 13, 2024
f449863
core rebuild and JDK version check
EachannChan Nov 15, 2024
c40e854
重构优化
EachannChan Nov 18, 2024
4027731
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 18, 2024
2f7a99d
format
EachannChan Nov 18, 2024
a812aff
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 18, 2024
a96658e
Merge branch 'virtual-thread' into virtual-thread-spring-boot3
EachannChan Nov 18, 2024
b4e5dcb
fix
EachannChan Nov 18, 2024
7eaf4ca
rebuild
EachannChan Nov 20, 2024
15929e1
format
EachannChan Nov 20, 2024
8457c52
format
EachannChan Nov 20, 2024
69ec089
rebuild and fix
EachannChan Nov 21, 2024
84d4d30
Merge branch 'dromara:master' into virtual-thread
EachannChan Nov 22, 2024
21e3b41
rebuild
EachannChan Nov 22, 2024
d063f36
optimize
EachannChan Nov 26, 2024
36dc4c4
rebuild
EachannChan Nov 27, 2024
36cd88a
optimize
EachannChan Dec 2, 2024
64afeae
remane
EachannChan Dec 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.entity.NotifyPlatform;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.common.event.CustomContextRefreshedEvent;
Expand Down Expand Up @@ -104,20 +104,20 @@ public Map<String, ExecutorWrapper> getExecutorWrappers() {
}

/**
* Get multi thread pool stats.
* Get multi executor stats.
*
* @return thead pools stats
* @return Executors stats
*/
@Override
public List<ThreadPoolStats> getMultiPoolStats() {
public List<ExecutorStats> getMultiExecutorStats() {
val executorWrappers = getExecutorWrappers();
if (MapUtils.isEmpty(executorWrappers)) {
return Collections.emptyList();
}

List<ThreadPoolStats> threadPoolStats = Lists.newArrayList();
executorWrappers.forEach((k, v) -> threadPoolStats.add(ExecutorConverter.toMetrics(v)));
return threadPoolStats;
List<ExecutorStats> executorStats = Lists.newArrayList();
executorWrappers.forEach((k, v) -> executorStats.add(ExecutorConverter.toMetrics(v)));
return executorStats;
}

public void refresh(List<TpExecutorProps> propsList, List<NotifyPlatform> platforms) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void doCollect(DtpProperties dtpProperties) {
if (MapUtils.isEmpty(handlerMap)) {
return;
}
handlerMap.forEach((k, v) -> v.getMultiPoolStats().forEach(ps ->
handlerMap.forEach((k, v) -> v.getMultiExecutorStats().forEach(ps ->
CollectorHandler.getInstance().collect(ps, dtpProperties.getCollectorTypes())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ private DynamicTpConst() { }
public static final String TRUE_STR = "true";

public static final String FALSE_STR = "false";

/**
* jre
*/
private static final Integer JRE_VERSION_21 = 21;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果没用到可以删除

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

/**
* JRE version
*
* @author kamtohung
*/
@Slf4j
Expand Down Expand Up @@ -53,12 +54,18 @@ public enum JreEnum {

JAVA_18,

JAVA_19;
JAVA_19,

JAVA_20,

JAVA_21;

private static final JreEnum VERSION = getJre();

public static final String DEFAULT_JAVA_VERSION = "1.8";

private static final int JRE_VERSION_OFFSET = 8;

/**
* get current JRE version
*
Expand All @@ -68,6 +75,14 @@ public static JreEnum currentVersion() {
return VERSION;
}

/**
* get current JRE integer version
* @return JRE integer version
*/
public static int currentIntVersion() {
return JreEnum.currentVersion().ordinal() + JRE_VERSION_OFFSET;
}

/**
* is current version
*
Expand Down Expand Up @@ -97,4 +112,24 @@ private static JreEnum getJre() {
return JAVA_8;
}

/**
* 判断当前版本是否大于某个版本
*
* @param targetVersion 目标版本
* @return 是否大于
*/
public static boolean greaterThan(JreEnum targetVersion) {
return getJre().ordinal() > targetVersion.ordinal();
}

/**
* 判断当前版本是否小于某个版本
*
* @param targetVersion 目标版本
* @return 是否小于
*/
public static boolean lessThan(JreEnum targetVersion) {
return getJre().ordinal() < targetVersion.ordinal();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
**/
@Data
@EqualsAndHashCode(callSuper = true)
public class ThreadPoolStats extends Metrics {
public class ExecutorStats extends Metrics {

/**
* 线程池名字
* 执行器名字
*/
private String poolName;
private String executorName;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JMXCollector没有用到此名字


/**
* 线程池别名
* 执行器别名
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?

行,我看看要是没有其他地方用到这两个字段我就去掉算了

*/
private String poolAliasName;
private String executorAliasName;

/**
* 核心线程数
Expand All @@ -51,6 +51,31 @@ public class ThreadPoolStats extends Metrics {
*/
private int maximumPoolSize;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 是否为DtpExecutor
*/
private boolean dynamic;

/**
* 是否为虚拟线程执行器
*/
private boolean isVirtualExecutor;

/**
* 空闲时间 (ms)
*/
Expand Down Expand Up @@ -81,16 +106,6 @@ public class ThreadPoolStats extends Metrics {
*/
private int queueRemainingCapacity;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 已执行完成的大致任务总数
*/
Expand Down Expand Up @@ -121,16 +136,6 @@ public class ThreadPoolStats extends Metrics {
*/
private String rejectHandlerName;

/**
* 是否DtpExecutor线程池
*/
private boolean dynamic;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 在队列等待超时任务数量
*/
Expand Down Expand Up @@ -185,4 +190,5 @@ public class ThreadPoolStats extends Metrics {
* 满足99.9%的任务执行所需的最低耗时
*/
private double tp999;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
**/
@Data
public class Metrics {

}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
}

ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.dromara.dynamictp.core.aware;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;

import java.util.Collections;
import java.util.List;
Expand All @@ -35,7 +35,7 @@ public interface MetricsAware extends DtpAware {
*
* @return the thread pool stats
*/
default ThreadPoolStats getPoolStats() {
default ExecutorStats getPoolStats() {
return null;
}

Expand All @@ -44,7 +44,7 @@ default ThreadPoolStats getPoolStats() {
*
* @return thead pools stats
*/
default List<ThreadPoolStats> getMultiPoolStats() {
default List<ExecutorStats> getMultiExecutorStats() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.dromara.dynamictp.core.converter;

import lombok.val;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
Expand Down Expand Up @@ -53,37 +53,39 @@ public static TpMainFields toMainFields(ExecutorWrapper executorWrapper) {
return mainFields;
}

public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
ExecutorAdapter<?> executor = wrapper.getExecutor();
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
ThreadPoolStats poolStats = convertCommon(executor);
poolStats.setPoolName(wrapper.getThreadPoolName());
poolStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
poolStats.setRunTimeoutCount(provider.getRunTimeoutCount());
poolStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
poolStats.setRejectCount(provider.getRejectedTaskCount());
poolStats.setDynamic(executor instanceof DtpExecutor);
ExecutorStats executorStats = convertCommon(executor);
executorStats.setExecutorName(wrapper.getThreadPoolName());
executorStats.setExecutorAliasName(wrapper.getThreadPoolAliasName());
executorStats.setRunTimeoutCount(provider.getRunTimeoutCount());
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());

poolStats.setTps(performanceSnapshot.getTps());
poolStats.setAvg(performanceSnapshot.getAvg());
poolStats.setMaxRt(performanceSnapshot.getMaxRt());
poolStats.setMinRt(performanceSnapshot.getMinRt());
poolStats.setTp50(performanceSnapshot.getTp50());
poolStats.setTp75(performanceSnapshot.getTp75());
poolStats.setTp90(performanceSnapshot.getTp90());
poolStats.setTp95(performanceSnapshot.getTp95());
poolStats.setTp99(performanceSnapshot.getTp99());
poolStats.setTp999(performanceSnapshot.getTp999());
return poolStats;
executorStats.setVirtualExecutor(wrapper.isVirtualThreadExecutor());

executorStats.setDynamic(executor instanceof DtpExecutor);
executorStats.setTps(performanceSnapshot.getTps());
executorStats.setAvg(performanceSnapshot.getAvg());
executorStats.setMaxRt(performanceSnapshot.getMaxRt());
executorStats.setMinRt(performanceSnapshot.getMinRt());
executorStats.setTp50(performanceSnapshot.getTp50());
executorStats.setTp75(performanceSnapshot.getTp75());
executorStats.setTp90(performanceSnapshot.getTp90());
executorStats.setTp95(performanceSnapshot.getTp95());
executorStats.setTp99(performanceSnapshot.getTp99());
executorStats.setTp999(performanceSnapshot.getTp999());
return executorStats;
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) {
ExecutorStats poolStats = new ExecutorStats();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变量名也改了

poolStats.setCorePoolSize(executor.getCorePoolSize());
poolStats.setMaximumPoolSize(executor.getMaximumPoolSize());
poolStats.setPoolSize(executor.getPoolSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import lombok.Getter;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;

/**
* ExecutorType related
Expand Down Expand Up @@ -55,7 +56,12 @@ public enum ExecutorType {
/**
* Priority executor type.
*/
PRIORITY("priority", PriorityDtpExecutor.class);
PRIORITY("priority", PriorityDtpExecutor.class),

/**
* Virtual thread executor adapter type.
*/
VIRTUAL("virtual", VirtualThreadExecutorProxy.class);

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.core.monitor.collector.InternalLogCollector;
import org.dromara.dynamictp.core.monitor.collector.LogCollector;
Expand Down Expand Up @@ -56,14 +56,14 @@ private CollectorHandler() {
COLLECTORS.put(jmxCollector.type(), jmxCollector);
}

public void collect(ThreadPoolStats poolStats, List<String> types) {
if (poolStats == null || CollectionUtils.isEmpty(types)) {
public void collect(ExecutorStats executorStats, List<String> types) {
if (executorStats == null || CollectionUtils.isEmpty(types)) {
return;
}
for (String collectorType : types) {
MetricsCollector collector = COLLECTORS.get(collectorType.toLowerCase());
if (collector != null) {
collector.collect(poolStats);
collector.collect(executorStats);
}
}
}
Expand Down
Loading
Loading