Skip to content

Commit

Permalink
[INLONG-11678][SDK] Optimize the ProxyClientConfig class (#11679)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Jan 19, 2025
1 parent 94d12f1 commit 5fcc384
Show file tree
Hide file tree
Showing 27 changed files with 1,688 additions and 905 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -194,13 +193,12 @@ private void createMessageSender() {
String managerAddr = conf.get(AGENT_MANAGER_ADDR);
String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
ProxyClientConfig proxyClientConfig = null;
TcpMsgSenderConfig proxyClientConfig = null;
try {
proxyClientConfig = new ProxyClientConfig(managerAddr, INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig = new TcpMsgSenderConfig(managerAddr, INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
Expand Down Expand Up @@ -199,15 +198,14 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) {
* createMessageSender
*/
private void createMessageSender() throws Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
managerAddr, inlongGroupId, authSecretId, authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);

proxyClientConfig.setIoThreadNum(ioThreadNum);
proxyClientConfig.setEnableBusyWait(enableBusyWait);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
proxyClientConfig.setEnableEpollBusyWait(enableBusyWait);

SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath,
Thread.currentThread().isDaemon());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.inlong.sdk.dataproxy;

import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
Expand Down Expand Up @@ -55,58 +55,54 @@ public class DefaultMessageSender implements MessageSender {
/* Store index <groupId_streamId,cnt> */
private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
private String groupId;
private int msgtype = ConfigConstants.MSG_TYPE;
private int msgtype = SdkConsts.MSG_TYPE;
private boolean isCompress = true;
private boolean isGroupIdTransfer = false;
private boolean isReport = false;
private boolean isSupportLF = false;
private int maxPacketLength = -1;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private int cpsSize = SdkConsts.COMPRESS_SIZE;
private final int senderMaxAttempt;

public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
public DefaultMessageSender(TcpMsgSenderConfig configure) throws Exception {
this(configure, null);
}

public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
public DefaultMessageSender(TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws Exception {
ProxyUtils.validClientConfig(configure);
sender = new Sender(configure, selfDefineFactory);
sender.start();
groupId = configure.getInlongGroupId();
indexCol = new IndexCollectThread(storeIndex);
senderMaxAttempt = configure.getSenderMaxAttempt();
senderMaxAttempt = configure.getMaxSyncSendAttempt();
indexCol.start();

}

/**
* generate by cluster id
*
* @param configure - sender
* @param tcpConfig - sender
* @return - sender
*/
public static DefaultMessageSender generateSenderByClusterId(
ProxyClientConfig configure) throws Exception {
TcpMsgSenderConfig tcpConfig) throws Exception {

return generateSenderByClusterId(configure, null);
return generateSenderByClusterId(tcpConfig, null);
}

/**
* generate by cluster id
*
* @param configure - sender
* @param tcpConfig - sender
* @param selfDefineFactory - sender factory
* @return - sender
*/
public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
public static DefaultMessageSender generateSenderByClusterId(TcpMsgSenderConfig tcpConfig,
ThreadFactory selfDefineFactory) throws Exception {
// correct ProtocolType settings
if (!ProtocolType.TCP.equals(configure.getProtocolType())) {
configure.setProtocolType(ProtocolType.TCP);
}
LOGGER.info("Initial tcp sender, configure is {}", configure);
LOGGER.info("Initial tcp sender, configure is {}", tcpConfig);
// initial sender object
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure);
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(tcpConfig);
Tuple2<ProxyConfigEntry, String> result =
proxyConfigManager.getGroupIdConfigure(true);
if (result.getF0() == null) {
Expand All @@ -117,7 +113,7 @@ public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig c
return sender;
} else {
DefaultMessageSender tmpMessageSender =
new DefaultMessageSender(configure, selfDefineFactory);
new DefaultMessageSender(tcpConfig, selfDefineFactory);
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
return tmpMessageSender;
Expand All @@ -144,8 +140,8 @@ public void close() {
shutdownInternalThreads();
}

public ProxyClientConfig getProxyClientConfig() {
return sender.getConfigure();
public TcpMsgSenderConfig getProxyClientConfig() {
return sender.getTcpConfig();
}

public boolean isSupportLF() {
Expand Down Expand Up @@ -213,7 +209,7 @@ public void setMaxPacketLength(int maxPacketLength) {
}

public String getSDKVersion() {
return ConfigConstants.PROXY_SDK_VERSION;
return SdkConsts.PROXY_SDK_VERSION;
}

private SendResult attemptSendMessage(Function<Sender, SendResult> sendOperation) {
Expand Down
Loading

0 comments on commit 5fcc384

Please sign in to comment.