Skip to content

Commit

Permalink
[INLONG-11687][Agent] Optimize task main thread exception handling to…
Browse files Browse the repository at this point in the history
… prevent exception exits (#11688)
  • Loading branch information
justinwwhuang authored Jan 20, 2025
1 parent fd7190f commit 9abfce4
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ private void deleteInstance(String instanceId) {

private void deleteFromStore(String instanceId) {
InstanceProfile profile = instanceStore.getInstance(taskId, instanceId);
if (profile == null) {
LOGGER.error("try to delete instance from store but not found: taskId {} instanceId {}", taskId,
instanceId);
return;
}
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
instanceStore.deleteInstance(taskId, instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ private Runnable coreThread() {
LOGGER.info("start flush cache {}:{} flush interval {}", inlongGroupId, sourceName, batchFlushInterval);
running = true;
while (!shutdown) {
sendMessageFromCache();
try {
sendMessageFromCache();
} catch (Throwable e) {
LOGGER.error("send message from cache error: ", e);
}
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,32 @@ public void addCallbacks() {
public void run() {
Thread.currentThread().setName("task-core-" + getTaskId());
running = true;
try {
doRun();
} catch (Throwable e) {
LOGGER.error("do run error: ", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
while (!isFinished()) {
try {
doRun();
} catch (Throwable e) {
LOGGER.error("do run error: ", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
}
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
running = false;
}

protected void doRun() {
while (!isFinished()) {
taskPrint();
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
if (!initOK) {
continue;
}
List<InstanceProfile> profileList = getNewInstanceList();
for (InstanceProfile profile : profileList) {
InstanceAction action = new InstanceAction(ActionType.ADD, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}", getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
taskPrint();
if (!initOK) {
return;
}
List<InstanceProfile> profileList = getNewInstanceList();
for (InstanceProfile profile : profileList) {
InstanceAction action = new InstanceAction(ActionType.ADD, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}", getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
taskHeartbeat();
}
taskHeartbeat();
}

protected abstract List<InstanceProfile> getNewInstanceList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.task.logcollection.SQLTask;
import org.apache.inlong.common.enums.TaskStateEnum;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
public class TestSQLTask {

private static final Logger LOGGER = LoggerFactory.getLogger(TestSQLTask.class);
private static final ClassLoader LOADER = TestSQLTask.class.getClassLoader();
private static AgentBaseTestsHelper helper;
private static TaskManager manager;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new AgentThreadFactory("TestSQLTask"));

@BeforeClass
public static void setup() throws Exception {
helper = new AgentBaseTestsHelper(TestSQLTask.class.getName()).setupAgentHome();
manager = new TaskManager();
}

@AfterClass
public static void teardown() throws Exception {
helper.teardownAgentHome();
}

@Test
public void testScan() {
doTest(1, "select * from table where field = YYYYMMDD_[0-9]+;", CycleUnitType.DAY,
Arrays.asList("select * from table where field = 20230928_[0-9]+;",
"select * from table where field = 20230929_[0-9]+;",
"select * from table where field = 20230930_[0-9]+;"),
Arrays.asList("20230928", "20230929", "20230930"),
"20230928",
"20230930");
doTest(2, "select * from table where field = YYYYMMDDHH_[0-9]+;", CycleUnitType.HOUR,
Arrays.asList("select * from table where field = 2023092823_[0-9]+;",
"select * from table where field = 2023092900_[0-9]+;",
"select * from table where field = 2023092901_[0-9]+;"),
Arrays.asList("2023092823", "2023092900", "2023092901"), "2023092823", "2023092901");
doTest(3, "select * from table where field = YYYYMMDDHHmm_[0-9]+;", CycleUnitType.MINUTE,
Arrays.asList("select * from table where field = 202309282359_[0-9]+;",
"select * from table where field = 202309290000_[0-9]+;",
"select * from table where field = 202309290001_[0-9]+;"),
Arrays.asList("202309282359", "202309290000", "202309290001"), "202309282359", "202309290001");
}

@Test
public void testScanLowercase() {
doTest(1, "select * from table where field = yyyyMMdd_[0-9]+;", CycleUnitType.DAY,
Arrays.asList("select * from table where field = 20230928_[0-9]+;",
"select * from table where field = 20230929_[0-9]+;",
"select * from table where field = 20230930_[0-9]+;"),
Arrays.asList("20230928", "20230929", "20230930"),
"20230928",
"20230930");
doTest(2, "select * from table where field = yyyyMMddhh_[0-9]+;", CycleUnitType.HOUR,
Arrays.asList("select * from table where field = 2023092823_[0-9]+;",
"select * from table where field = 2023092900_[0-9]+;",
"select * from table where field = 2023092901_[0-9]+;"),
Arrays.asList("2023092823", "2023092900", "2023092901"), "2023092823", "2023092901");
doTest(3, "select * from table where field = yyyyMMddhhmm_[0-9]+;", CycleUnitType.MINUTE,
Arrays.asList("select * from table where field = 202309282359_[0-9]+;",
"select * from table where field = 202309290000_[0-9]+;",
"select * from table where field = 202309290001_[0-9]+;"),
Arrays.asList("202309282359", "202309290000", "202309290001"), "202309282359", "202309290001");
}

private void doTest(int taskId, String sql, String cycle, List<String> srcSQLs, List<String> srcDataTimes,
String startTime, String endTime) {
TaskProfile taskProfile = helper.getSQLTaskProfile(taskId, sql, "csv", true, startTime, endTime,
TaskStateEnum.RUNNING, cycle, "GMT+8:00");
SQLTask sqlTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
try {
sqlTask = PowerMockito.spy(new SQLTask());
PowerMockito.doAnswer(invocation -> {
fileName.add(invocation.getArgument(0));
dataTime.add(invocation.getArgument(1));
return null;
}).when(sqlTask, "addToEvenMap", Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(),
Mockito.anyString());
Assert.assertTrue(sqlTask.isProfileValid(taskProfile));
manager.getTaskStore().storeTask(taskProfile);
sqlTask.init(manager, taskProfile, manager.getInstanceBasicStore());
EXECUTOR_SERVICE.submit(sqlTask);
} catch (Exception e) {
LOGGER.error("source init error", e);
Assert.assertTrue("source init error", false);
}
await().atMost(10, TimeUnit.SECONDS)
.until(() -> fileName.size() == srcDataTimes.size() && dataTime.size() == srcDataTimes.size());
for (int i = 0; i < fileName.size(); i++) {
Assert.assertEquals(0, fileName.get(i).compareTo(srcSQLs.get(i)));
Assert.assertEquals(0, dataTime.get(i).compareTo(srcDataTimes.get(i)));
}
sqlTask.destroy();
}
}

0 comments on commit 9abfce4

Please sign in to comment.