diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 80cc596c265..74cfcffa21d 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -31,7 +31,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; @Slf4j @Builder @@ -75,7 +74,11 @@ public void init() throws Exception { } public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException { - dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS); + dirtyDataQueue.put(messageWrapper); + } + + public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) { + return dirtyDataQueue.offer(messageWrapper); } private void doSendDirtyMessage() { diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 8e692a4c105..daec1c0694e 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -74,7 +74,7 @@ public void invoke(DirtyData dirtyData) throws Exception { .data(dirtyMessage) .build(); - dirtySender.sendDirtyMessage(wrapper); + dirtySender.sendDirtyMessageAsync(wrapper); } catch (Throwable t) { log.error("failed to send dirty message to inlong sdk", t); if (!options.isIgnoreSideOutputErrors()) {