From d7842a3e7ed7cb34c2fb4613afeeb132068a54d2 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Thu, 2 Jan 2025 14:50:53 +0800 Subject: [PATCH] [INLONG-11629][SDK] Adjust the Sender initialization logic --- .../inlong/sdk/dataproxy/network/Sender.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index b56781c0ce..eb320b4a2a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -82,15 +82,28 @@ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) thro this.asyncCallbackMaxSize = configure.getTotalAsyncCallbackSize(); this.threadPool = Executors.newCachedThreadPool(); this.clientMgr = new ClientMgr(configure, this, selfDefineFactory); + this.scanThread = new TimeoutScanThread(this, configure); + if (configure.isEnableMetric()) { + metricWorker = new MetricWorkerThread(configure, this); + } + logger.info("Sender({}) instance initialized!", this.instanceId); + } + + public void start() throws Exception { + if (!started.compareAndSet(false, true)) { + return; + } + this.clientMgr.start(); + this.scanThread.start(); ProxyConfigEntry proxyConfigEntry; try { proxyConfigEntry = this.clientMgr.getGroupIdConfigure(); setClusterId(proxyConfigEntry.getClusterId()); - } catch (Throwable e) { + } catch (Throwable ex) { if (configure.isOnlyUseLocalProxyConfig()) { - throw new Exception("Get local proxy configure failure!", e.getCause()); + throw new Exception("Get local proxy configure failure!", ex); } else { - throw new Exception("Visit manager error!", e.getCause()); + throw new Exception("Visit manager error!", ex); } } if (!proxyConfigEntry.isInterVisit()) { @@ -101,19 +114,6 @@ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) thro throw new Exception("In OutNetwork isNeedDataEncry must be true!"); } } - scanThread = new TimeoutScanThread(this, configure); - if (configure.isEnableMetric()) { - metricWorker = new MetricWorkerThread(configure, this); - } - logger.info("Sender({}) instance initialized!", this.instanceId); - } - - public void start() { - if (!started.compareAndSet(false, true)) { - return; - } - this.clientMgr.start(); - this.scanThread.start(); if (this.configure.isEnableMetric()) { this.metricWorker.start(); }