diff --git a/driver-rocketmq/pom.xml b/driver-rocketmq/pom.xml
index 9e60ff3cc..3cd0a9139 100644
--- a/driver-rocketmq/pom.xml
+++ b/driver-rocketmq/pom.xml
@@ -26,7 +26,7 @@
driver-rocketmq
- 4.9.3
+ 5.1.4
diff --git a/driver-rocketmq/rocketmq.yaml b/driver-rocketmq/rocketmq.yaml
index b5aec62ee..b9b608088 100644
--- a/driver-rocketmq/rocketmq.yaml
+++ b/driver-rocketmq/rocketmq.yaml
@@ -19,5 +19,15 @@ driverClass: io.openmessaging.benchmark.driver.rocketmq.RocketMQBenchmarkDriver
clusterName: DefaultCluster
namesrvAddr: 127.0.0.1:9876
vipChannelEnabled: false
+
+batchCQ: true
+autoBatch: true
+# batchMaxBytes: 32768
+# batchMaxDelayMs: 10
+# totalBatchMaxBytes: 33554432
+
+enableBackpressure: true
+backpressureConcurrency: 1024
+
accessKey:
secretKey:
\ No newline at end of file
diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java
index d7d547098..d5e2edeb9 100644
--- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java
+++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java
@@ -26,9 +26,11 @@
import io.openmessaging.benchmark.driver.rocketmq.client.RocketMQClientConfig;
import java.io.File;
import java.io.IOException;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -37,12 +39,18 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +88,28 @@ public String getTopicNamePrefix() {
return "RocketMQ-Benchmark";
}
+ Map> cachedBrokerAddr = new ConcurrentHashMap<>();
+
+ int fetchCnt = 0;
+
+ private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt,
+ final String clusterName) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
+ Set brokerList = cachedBrokerAddr.get(clusterName);
+ if (brokerList == null) {
+ brokerList =
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(
+ adminExt, this.rmqClientConfig.clusterName);
+ cachedBrokerAddr.put(clusterName, brokerList);
+ if (brokerList.isEmpty()) {
+ throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName);
+ }
+ }
+ if (fetchCnt++ % 100 == 0) {
+ log.info("fetch brokerAddr count: " + fetchCnt);
+ }
+ return brokerList;
+ }
+
@Override
public CompletableFuture createTopic(final String topic, final int partitions) {
return CompletableFuture.runAsync(
@@ -90,10 +120,13 @@ public CompletableFuture createTopic(final String topic, final int partiti
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
+ if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
+ topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
+ }
try {
Set brokerList =
- CommandUtil.fetchMasterAddrByClusterName(
+ fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
@@ -130,6 +163,26 @@ public CompletableFuture createProducer(final String topic) {
if (null != this.rmqClientConfig.compressMsgBodyOverHowmuch) {
rmqProducer.setCompressMsgBodyOverHowmuch(this.rmqClientConfig.compressMsgBodyOverHowmuch);
}
+
+ if (null != this.rmqClientConfig.autoBatch) {
+ rmqProducer.setAutoBatch(this.rmqClientConfig.autoBatch);
+ }
+ if (null != this.rmqClientConfig.batchMaxBytes) {
+ rmqProducer.batchMaxBytes(this.rmqClientConfig.batchMaxBytes);
+ }
+ if (null != this.rmqClientConfig.batchMaxDelayMs) {
+ rmqProducer.batchMaxDelayMs(this.rmqClientConfig.batchMaxDelayMs);
+ }
+ if (null != this.rmqClientConfig.totalBatchMaxBytes) {
+ rmqProducer.totalBatchMaxBytes(this.rmqClientConfig.totalBatchMaxBytes);
+ }
+ if (null != this.rmqClientConfig.enableBackpressure) {
+ rmqProducer.setEnableBackpressureForAsyncMode(this.rmqClientConfig.enableBackpressure);
+ }
+ if (null != this.rmqClientConfig.backpressureConcurrency) {
+ rmqProducer.setBackPressureForAsyncSendNum(this.rmqClientConfig.backpressureConcurrency);
+ }
+
try {
rmqProducer.start();
} catch (MQClientException e) {
diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java
index 840090452..26cb109bb 100644
--- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java
+++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java
@@ -19,6 +19,14 @@ public class RocketMQClientConfig {
public Boolean vipChannelEnabled;
public Integer maxMessageSize;
public Integer compressMsgBodyOverHowmuch;
+ public Boolean batchCQ;
+ public Boolean autoBatch;
+ public Integer batchMaxBytes;
+ public Integer batchMaxDelayMs;
+ public Integer totalBatchMaxBytes;
+
+ public Boolean enableBackpressure;
+ public Integer backpressureConcurrency;
public String accessKey;
public String secretKey;
}