Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #483] Virtual thread compatible #506

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f5f25f5
virtual thread兼容,通过VirtualThreadExecutorAdapter进行声明
EachannChan Oct 14, 2024
cc83c79
bean instanceof ThreadContainer && bean instanceof ExecutorService 的…
EachannChan Oct 14, 2024
0d0c160
使用Executors.newVirtualThreadPerTaskExecutor()进行声明
EachannChan Oct 15, 2024
74f64c6
format
EachannChan Oct 15, 2024
17a913b
以注释的方式来声明executor的实现
EachannChan Oct 16, 2024
706d887
动态配置的实现思路
EachannChan Oct 16, 2024
f05dd26
proxy的重构,这样写Bean无法注册,耦合性也高
EachannChan Oct 16, 2024
a85826c
移除jdk21特性使之兼容老版本
EachannChan Oct 21, 2024
4a93554
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Oct 21, 2024
4495be0
fix
EachannChan Oct 21, 2024
a729e11
配置中心进行配置,可以获取到executor并执行但spring没有感知到此bean,即postProcessAfterInitializ…
EachannChan Oct 21, 2024
6a1e1a9
解决配置中心配置VT未被登记的问题
EachannChan Nov 4, 2024
3755c8b
doc
EachannChan Nov 4, 2024
db44cd8
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 4, 2024
0a38447
监控实现思路
EachannChan Nov 4, 2024
f4b5804
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 4, 2024
c791fd2
监控模块部分实现,VTExecutorStats为期望能获取到的数据
EachannChan Nov 5, 2024
ebc5e0e
监控基本实现,留了一些保留字段
EachannChan Nov 7, 2024
e29ebd4
complete EsCollector,format
EachannChan Nov 13, 2024
554f3f1
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 13, 2024
3dc7ee6
fix conflict and adapt
EachannChan Nov 13, 2024
0b18056
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 13, 2024
4154092
fix and format
EachannChan Nov 13, 2024
d9b5d4d
fix dependency
EachannChan Nov 13, 2024
119d2e2
update mvn jdk
EachannChan Nov 13, 2024
7944b9b
try fix gitgub err
EachannChan Nov 13, 2024
33a4214
del
EachannChan Nov 13, 2024
fe40c63
format
EachannChan Nov 13, 2024
f449863
core rebuild and JDK version check
EachannChan Nov 15, 2024
c40e854
重构优化
EachannChan Nov 18, 2024
4027731
Merge branch 'springboot3' into virtual-thread
EachannChan Nov 18, 2024
2f7a99d
format
EachannChan Nov 18, 2024
a812aff
Merge remote-tracking branch 'origin/virtual-thread' into virtual-thread
EachannChan Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public enum JreEnum {

JAVA_18,

JAVA_19;
JAVA_19,

JAVA_20,

JAVA_21;

private static final JreEnum VERSION = getJre();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,75 @@
**/
@Data
public class Metrics {

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 是否为DtpExecutor
*/
private boolean dynamic;

/**
* tps
*/
private double tps;

/**
* 最大任务耗时
*/
private long maxRt;

/**
* 最小任务耗时
*/
private long minRt;

/**
* 任务平均耗时(单位:ms)
*/
private double avg;

/**
* 满足50%的任务执行所需的最低耗时
*/
private double tp50;

/**
* 满足75%的任务执行所需的最低耗时
*/
private double tp75;

/**
* 满足90%的任务执行所需的最低耗时
*/
private double tp90;

/**
* 满足95%的任务执行所需的最低耗时
*/
private double tp95;

/**
* 满足99%的任务执行所需的最低耗时
*/
private double tp99;

/**
* 满足99.9%的任务执行所需的最低耗时
*/
private double tp999;

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ public class ThreadPoolStats extends Metrics {
*/
private int queueRemainingCapacity;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 已执行完成的大致任务总数
*/
Expand Down Expand Up @@ -121,68 +111,9 @@ public class ThreadPoolStats extends Metrics {
*/
private String rejectHandlerName;

/**
* 是否DtpExecutor线程池
*/
private boolean dynamic;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 在队列等待超时任务数量
*/
private long queueTimeoutCount;

/**
* tps
*/
private double tps;

/**
* 最大任务耗时
*/
private long maxRt;

/**
* 最小任务耗时
*/
private long minRt;

/**
* 任务平均耗时(单位:ms)
*/
private double avg;

/**
* 满足50%的任务执行所需的最低耗时
*/
private double tp50;

/**
* 满足75%的任务执行所需的最低耗时
*/
private double tp75;

/**
* 满足90%的任务执行所需的最低耗时
*/
private double tp90;

/**
* 满足95%的任务执行所需的最低耗时
*/
private double tp95;

/**
* 满足99%的任务执行所需的最低耗时
*/
private double tp99;

/**
* 满足99.9%的任务执行所需的最低耗时
*/
private double tp999;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.dromara.dynamictp.common.entity;

import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* ClassName: VTExecutorStats
* Package: org.dromara.dynamictp.common.entity
* Description:
*
* @author CYC
* @create 2024/11/4 16:52
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class VTExecutorStats extends Metrics {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不要单独建VTExecutorStats,还是复用之前的ThreadPoolStats,可以改名为ExecutorStats,新加executorName、executorAliasName字段,同时保留poolName、poolAliasName,并设置为@deprecated,在后续版本移除掉


/**
* 虚拟线程执行器名字
*/
private String executorName;

/**
* 虚拟线程执行器别名
*/
private String executorAliasName;

}
23 changes: 13 additions & 10 deletions core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
doRefresh(executorWrapper, props);
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
if (oldFields.equals(newFields)) {
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) {
log.debug("DynamicTp refresh, main properties of [{}] have not changed.",
executorWrapper.getThreadPoolName());
return;
Expand Down Expand Up @@ -227,8 +227,10 @@ private static void doRefresh(ExecutorWrapper executorWrapper, DtpExecutorProps
if (!Objects.equals(executor.allowsCoreThreadTimeOut(), props.isAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(props.isAllowCoreThreadTimeOut());
}
// update queue
updateQueueProps(executor, props);
if(!executorWrapper.isVirtualThreadExecutor()) {
// update queue
updateQueueProps(executor, props);
}

if (executorWrapper.isDtpExecutor()) {
doRefreshDtp(executorWrapper, props);
Expand All @@ -242,13 +244,14 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
}

ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
if(!executorWrapper.isVirtualThreadExecutor()) {
ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
executorWrapper.setRejectHandler(rejectHandler);
}
}

List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.val;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.common.entity.VTExecutorStats;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
import org.dromara.dynamictp.core.support.ExecutorAdapter;
Expand Down Expand Up @@ -82,6 +83,32 @@ public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
return poolStats;
}

public static VTExecutorStats toVTExecutorMetrics(ExecutorWrapper wrapper) {
ExecutorAdapter<?> executor = wrapper.getExecutor();
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
VTExecutorStats executorStats = convertCommonVT(executor);
executorStats.setExecutorName(wrapper.getThreadPoolName());
executorStats.setExecutorAliasName(wrapper.getThreadPoolAliasName());
executorStats.setDynamic(executor instanceof DtpExecutor);

executorStats.setTps(performanceSnapshot.getTps());
executorStats.setAvg(performanceSnapshot.getAvg());
executorStats.setMaxRt(performanceSnapshot.getMaxRt());
executorStats.setMinRt(performanceSnapshot.getMinRt());
executorStats.setTp50(performanceSnapshot.getTp50());
executorStats.setTp75(performanceSnapshot.getTp75());
executorStats.setTp90(performanceSnapshot.getTp90());
executorStats.setTp95(performanceSnapshot.getTp95());
executorStats.setTp99(performanceSnapshot.getTp99());
executorStats.setTp999(performanceSnapshot.getTp999());
return executorStats;
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
poolStats.setCorePoolSize(executor.getCorePoolSize());
Expand All @@ -100,4 +127,11 @@ private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
poolStats.setKeepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
return poolStats;
}

private static VTExecutorStats convertCommonVT(ExecutorAdapter<?> executor) {
VTExecutorStats executorStats = new VTExecutorStats();
executorStats.setActiveCount(executor.getActiveCount());
executorStats.setTaskCount(executor.getTaskCount());
return executorStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import lombok.Getter;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;

/**
* ExecutorType related
Expand Down Expand Up @@ -55,7 +56,12 @@ public enum ExecutorType {
/**
* Priority executor type.
*/
PRIORITY("priority", PriorityDtpExecutor.class);
PRIORITY("priority", PriorityDtpExecutor.class),

/**
* Virtual thread executor adapter type.
*/
VIRTUAL("virtual", VirtualThreadExecutorProxy.class);

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.VTExecutorStats;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.core.monitor.collector.InternalLogCollector;
import org.dromara.dynamictp.core.monitor.collector.LogCollector;
Expand All @@ -44,7 +45,7 @@ public final class CollectorHandler {

private CollectorHandler() {
List<MetricsCollector> loadedCollectors = ExtensionServiceLoader.get(MetricsCollector.class);
loadedCollectors.forEach(collector -> COLLECTORS.put(collector.type().toLowerCase(), collector));
loadedCollectors.forEach(collector -> COLLECTORS.put(collector.type(), collector));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

解决冲突有问题?toLowerCase()咋删了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没印象了😂


MetricsCollector microMeterCollector = new MicroMeterCollector();
LogCollector logCollector = new LogCollector();
Expand All @@ -68,6 +69,18 @@ public void collect(ThreadPoolStats poolStats, List<String> types) {
}
}

public void collectVTExecutor(VTExecutorStats vtExecutorStats, List<String> types) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不要单独收集了了,复用一套就行,重复代码太多,看着好不舒服

if (vtExecutorStats == null || CollectionUtils.isEmpty(types)) {
return;
}
for (String collectorType : types) {
MetricsCollector collector = COLLECTORS.get(collectorType.toLowerCase());
if (collector != null) {
collector.collect(vtExecutorStats);
}
}
}

public static CollectorHandler getInstance() {
return CollectorHandlerHolder.INSTANCE;
}
Expand Down
Loading
Loading