Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #483] Virtual thread compatible #508

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f5f25f5
virtual thread兼容,通过VirtualThreadExecutorAdapter进行声明
EachannChan Oct 14, 2024
cc83c79
bean instanceof ThreadContainer && bean instanceof ExecutorService 的…
EachannChan Oct 14, 2024
0d0c160
使用Executors.newVirtualThreadPerTaskExecutor()进行声明
EachannChan Oct 15, 2024
74f64c6
format
EachannChan Oct 15, 2024
17a913b
以注释的方式来声明executor的实现
EachannChan Oct 16, 2024
706d887
动态配置的实现思路
EachannChan Oct 16, 2024
f05dd26
proxy的重构,这样写Bean无法注册,耦合性也高
EachannChan Oct 16, 2024
a85826c
移除jdk21特性使之兼容老版本
EachannChan Oct 21, 2024
4a93554
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Oct 21, 2024
4495be0
fix
EachannChan Oct 21, 2024
a729e11
配置中心进行配置,可以获取到executor并执行但spring没有感知到此bean,即postProcessAfterInitializ…
EachannChan Oct 21, 2024
6a1e1a9
解决配置中心配置VT未被登记的问题
EachannChan Nov 4, 2024
3755c8b
doc
EachannChan Nov 4, 2024
db44cd8
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 4, 2024
0a38447
监控实现思路
EachannChan Nov 4, 2024
f4b5804
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 4, 2024
c791fd2
监控模块部分实现,VTExecutorStats为期望能获取到的数据
EachannChan Nov 5, 2024
ebc5e0e
监控基本实现,留了一些保留字段
EachannChan Nov 7, 2024
e29ebd4
complete EsCollector,format
EachannChan Nov 13, 2024
554f3f1
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 13, 2024
3dc7ee6
fix conflict and adapt
EachannChan Nov 13, 2024
0b18056
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 13, 2024
4154092
fix and format
EachannChan Nov 13, 2024
d9b5d4d
fix dependency
EachannChan Nov 13, 2024
119d2e2
update mvn jdk
EachannChan Nov 13, 2024
7944b9b
try fix gitgub err
EachannChan Nov 13, 2024
33a4214
del
EachannChan Nov 13, 2024
fe40c63
format
EachannChan Nov 13, 2024
f449863
core rebuild and JDK version check
EachannChan Nov 15, 2024
c40e854
重构优化
EachannChan Nov 18, 2024
4027731
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 18, 2024
2f7a99d
format
EachannChan Nov 18, 2024
a812aff
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 18, 2024
a96658e
Merge branch 'virtual-thread' into virtual-thread-spring-boot3
EachannChan Nov 18, 2024
b4e5dcb
fix
EachannChan Nov 18, 2024
7eaf4ca
rebuild
EachannChan Nov 20, 2024
15929e1
format
EachannChan Nov 20, 2024
8457c52
format
EachannChan Nov 20, 2024
69ec089
rebuild and fix
EachannChan Nov 21, 2024
84d4d30
Merge branch 'dromara:master' into virtual-thread
EachannChan Nov 22, 2024
21e3b41
rebuild
EachannChan Nov 22, 2024
d063f36
optimize
EachannChan Nov 26, 2024
36dc4c4
rebuild
EachannChan Nov 27, 2024
36cd88a
optimize
EachannChan Dec 2, 2024
64afeae
remane
EachannChan Dec 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.entity.NotifyPlatform;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.common.event.CustomContextRefreshedEvent;
Expand Down Expand Up @@ -109,13 +109,13 @@ public Map<String, ExecutorWrapper> getExecutorWrappers() {
* @return thead pools stats
*/
@Override
public List<ThreadPoolStats> getMultiPoolStats() {
public List<ExecutorStats> getMultiPoolStats() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改完全点,方法、变量名

val executorWrappers = getExecutorWrappers();
if (MapUtils.isEmpty(executorWrappers)) {
return Collections.emptyList();
}

List<ThreadPoolStats> threadPoolStats = Lists.newArrayList();
List<ExecutorStats> threadPoolStats = Lists.newArrayList();
executorWrappers.forEach((k, v) -> threadPoolStats.add(ExecutorConverter.toMetrics(v)));
return threadPoolStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public enum JreEnum {

JAVA_18,

JAVA_19;
JAVA_19,

JAVA_20,

JAVA_21;

private static final JreEnum VERSION = getJre();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,28 @@
**/
@Data
@EqualsAndHashCode(callSuper = true)
public class ThreadPoolStats extends Metrics {
public class ExecutorStats extends Metrics {

/**
* 执行器名字
*/
private String executorName;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JMXCollector没有用到此名字


/**
* 执行器别名
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个名字字段如果标注@deprecated的话需要兼容,不然直接改名就好?

行,我看看要是没有其他地方用到这两个字段我就去掉算了

*/
private String executorAliasName;

/**
* 线程池名字
*/
@Deprecated
private String poolName;

/**
* 线程池别名
*/
@Deprecated
private String poolAliasName;

/**
Expand All @@ -51,6 +63,31 @@ public class ThreadPoolStats extends Metrics {
*/
private int maximumPoolSize;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 是否为DtpExecutor
*/
private boolean dynamic;

/**
* 是否为虚拟线程执行器
*/
private boolean isVirtualExecutor;

/**
* 空闲时间 (ms)
*/
Expand Down Expand Up @@ -81,16 +118,6 @@ public class ThreadPoolStats extends Metrics {
*/
private int queueRemainingCapacity;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 已执行完成的大致任务总数
*/
Expand Down Expand Up @@ -121,16 +148,6 @@ public class ThreadPoolStats extends Metrics {
*/
private String rejectHandlerName;

/**
* 是否DtpExecutor线程池
*/
private boolean dynamic;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 在队列等待超时任务数量
*/
Expand Down Expand Up @@ -185,4 +202,5 @@ public class ThreadPoolStats extends Metrics {
* 满足99.9%的任务执行所需的最低耗时
*/
private double tp999;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
**/
@Data
public class Metrics {

}
17 changes: 9 additions & 8 deletions core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
doRefresh(executorWrapper, props);
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
if (oldFields.equals(newFields)) {
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用&&有问题

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用&&有问题

那我是直接删掉还是?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用&&有问题

那我是直接删掉还是?

感觉删掉较好

log.debug("DynamicTp refresh, main properties of [{}] have not changed.",
executorWrapper.getThreadPoolName());
return;
Expand Down Expand Up @@ -242,13 +242,14 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
}

ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
if (!executorWrapper.isVirtualThreadExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个判断去掉吧,判断太多可读性会变差

ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
}
}

List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.dromara.dynamictp.core.aware;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;

import java.util.Collections;
import java.util.List;
Expand All @@ -35,7 +35,7 @@ public interface MetricsAware extends DtpAware {
*
* @return the thread pool stats
*/
default ThreadPoolStats getPoolStats() {
default ExecutorStats getPoolStats() {
return null;
}

Expand All @@ -44,7 +44,7 @@ default ThreadPoolStats getPoolStats() {
*
* @return thead pools stats
*/
default List<ThreadPoolStats> getMultiPoolStats() {
default List<ExecutorStats> getMultiPoolStats() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.dromara.dynamictp.core.converter;

import lombok.val;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
Expand Down Expand Up @@ -53,37 +53,43 @@ public static TpMainFields toMainFields(ExecutorWrapper executorWrapper) {
return mainFields;
}

public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
ExecutorAdapter<?> executor = wrapper.getExecutor();
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
ThreadPoolStats poolStats = convertCommon(executor);
poolStats.setPoolName(wrapper.getThreadPoolName());
poolStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
poolStats.setRunTimeoutCount(provider.getRunTimeoutCount());
poolStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
poolStats.setRejectCount(provider.getRejectedTaskCount());
poolStats.setDynamic(executor instanceof DtpExecutor);
ExecutorStats executorStats = convertCommon(executor);
executorStats.setPoolName(wrapper.getThreadPoolName());
executorStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新加的两个字段没赋值?

Copy link
Contributor Author

@EachannChan EachannChan Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题

字段赋值不影响啊,就是监控指标多了两个字段,可用可不用

Copy link
Contributor Author

@EachannChan EachannChan Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我还是用的poolname那两个打通的逻辑,想着到时一起改可能会好些。要是只改虚拟线程这边的话我怕会出现问题

字段赋值不影响啊,就是监控指标多了两个字段,可用可不用

好,那我把那两个加上去


poolStats.setTps(performanceSnapshot.getTps());
poolStats.setAvg(performanceSnapshot.getAvg());
poolStats.setMaxRt(performanceSnapshot.getMaxRt());
poolStats.setMinRt(performanceSnapshot.getMinRt());
poolStats.setTp50(performanceSnapshot.getTp50());
poolStats.setTp75(performanceSnapshot.getTp75());
poolStats.setTp90(performanceSnapshot.getTp90());
poolStats.setTp95(performanceSnapshot.getTp95());
poolStats.setTp99(performanceSnapshot.getTp99());
poolStats.setTp999(performanceSnapshot.getTp999());
return poolStats;
if (!wrapper.isVirtualThreadExecutor()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥加这个判断,虚拟线程也会有超时计数

executorStats.setRunTimeoutCount(provider.getRunTimeoutCount());
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());
executorStats.setVirtualExecutor(false);
} else {
executorStats.setVirtualExecutor(true);
}

executorStats.setDynamic(executor instanceof DtpExecutor);
executorStats.setTps(performanceSnapshot.getTps());
executorStats.setAvg(performanceSnapshot.getAvg());
executorStats.setMaxRt(performanceSnapshot.getMaxRt());
executorStats.setMinRt(performanceSnapshot.getMinRt());
executorStats.setTp50(performanceSnapshot.getTp50());
executorStats.setTp75(performanceSnapshot.getTp75());
executorStats.setTp90(performanceSnapshot.getTp90());
executorStats.setTp95(performanceSnapshot.getTp95());
executorStats.setTp99(performanceSnapshot.getTp99());
executorStats.setTp999(performanceSnapshot.getTp999());
return executorStats;
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) {
ExecutorStats poolStats = new ExecutorStats();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变量名也改了

poolStats.setCorePoolSize(executor.getCorePoolSize());
poolStats.setMaximumPoolSize(executor.getMaximumPoolSize());
poolStats.setPoolSize(executor.getPoolSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import lombok.Getter;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;

/**
* ExecutorType related
Expand Down Expand Up @@ -55,7 +56,12 @@ public enum ExecutorType {
/**
* Priority executor type.
*/
PRIORITY("priority", PriorityDtpExecutor.class);
PRIORITY("priority", PriorityDtpExecutor.class),

/**
* Virtual thread executor adapter type.
*/
VIRTUAL("virtual", VirtualThreadExecutorProxy.class);

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.core.monitor.collector.InternalLogCollector;
import org.dromara.dynamictp.core.monitor.collector.LogCollector;
Expand Down Expand Up @@ -56,14 +56,14 @@ private CollectorHandler() {
COLLECTORS.put(jmxCollector.type(), jmxCollector);
}

public void collect(ThreadPoolStats poolStats, List<String> types) {
if (poolStats == null || CollectionUtils.isEmpty(types)) {
public void collect(ExecutorStats executorStats, List<String> types) {
if (executorStats == null || CollectionUtils.isEmpty(types)) {
return;
}
for (String collectorType : types) {
MetricsCollector collector = COLLECTORS.get(collectorType.toLowerCase());
if (collector != null) {
collector.collect(poolStats);
collector.collect(executorStats);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.event.AlarmCheckEvent;
import org.dromara.dynamictp.common.event.CollectEvent;
import org.dromara.dynamictp.common.event.CustomContextRefreshedEvent;
import org.dromara.dynamictp.common.manager.EventBusManager;

import org.dromara.dynamictp.common.properties.DtpProperties;

import org.dromara.dynamictp.core.DtpRegistry;
import org.dromara.dynamictp.core.converter.ExecutorConverter;
import org.dromara.dynamictp.core.handler.CollectorHandler;
Expand Down Expand Up @@ -103,7 +101,8 @@ private void collectMetrics(Set<String> executorNames) {
publishCollectEvent();
}

private void doCollect(ThreadPoolStats threadPoolStats) {

private void doCollect(ExecutorStats threadPoolStats) {
try {
CollectorHandler.getInstance().collect(threadPoolStats, dtpProperties.getCollectorTypes());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.dromara.dynamictp.core.monitor.collector;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,10 +29,11 @@
public class InternalLogCollector extends AbstractCollector {

@Override
public void collect(ThreadPoolStats poolStats) {
public void collect(ExecutorStats poolStats) {
log.info("dynamic.tp metrics: {}", JsonUtil.toJson(poolStats));
}


@Override
public String type() {
return CollectorTypeEnum.INTERNAL_LOGGING.name().toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.dromara.dynamictp.core.monitor.collector;

import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.util.JsonUtil;
import org.dromara.dynamictp.logging.LogHelper;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -33,7 +33,7 @@
public class LogCollector extends AbstractCollector {

@Override
public void collect(ThreadPoolStats threadPoolStats) {
public void collect(ExecutorStats threadPoolStats) {
String metrics = JsonUtil.toJson(threadPoolStats);
if (LogHelper.getMonitorLogger() == null) {
log.error("Cannot find monitor logger...");
Expand Down
Loading
Loading