Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10035][Agent] Report audit data using the new SDK interface #10038

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;

/**
* Handle List of Proxy Message, which belong to the same stream id.
Expand Down Expand Up @@ -77,6 +78,7 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
extraMap.put(AUDIT_VERSION, taskId);
}

public void generateExtraMap(String dataKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;

/**
* AuditUtils
Expand All @@ -42,6 +44,7 @@ public class AuditUtils {
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
public static final int AUDIT_ID_AGENT_READ_FAILED = 10003;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
Expand All @@ -54,6 +57,7 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
public static final int AUDIT_ID_AGENT_READ_FAILED_REAL_TIME = 30012;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
Expand Down Expand Up @@ -93,11 +97,17 @@ public static void initAudit() {
* Add audit metric
*/
public static void add(int auditID, String inlongGroupId, String inlongStreamId,
long logTime, int count, long size) {
long logTime, int count, long size, long version) {
if (!IS_AUDIT) {
return;
}
AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
AuditOperator.getInstance()
.add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId, inlongStreamId, logTime, count, size, version);
}

public static void add(int auditID, String inlongGroupId, String inlongStreamId,
long logTime, int count, long size) {
add(auditID, inlongGroupId, inlongStreamId, logTime, count, size, DEFAULT_AUDIT_VERSION);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class InstanceManager extends AbstractDaemon {
private final int instanceLimit;
private final AgentConfiguration agentConf;
private final String taskId;
private long auditVersion;
private volatile boolean runAtLeastOneTime = false;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
Expand Down Expand Up @@ -122,6 +123,7 @@ public String toString() {
*/
public InstanceManager(String taskId, int instanceLimit, Db basicDb, TaskProfileDb taskProfileDb) {
this.taskId = taskId;
this.auditVersion = Long.parseLong(taskId);
instanceDb = new InstanceDb(basicDb);
this.taskProfileDb = taskProfileDb;
this.agentConf = AgentConfiguration.getAgentConf();
Expand Down Expand Up @@ -171,7 +173,7 @@ private Runnable coreThread() {
String inlongGroupId = taskFromDb.getInlongGroupId();
String inlongStreamId = taskFromDb.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
AgentUtils.getCurrentTime(), 1, 1, auditVersion);
} catch (Throwable ex) {
LOGGER.error("coreThread error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
Expand Down Expand Up @@ -387,7 +389,7 @@ private void deleteFromDb(String instanceId) {
LOGGER.info("delete instance from db: taskId {} instanceId {} result {}", taskId,
instanceId, instanceDb.getInstance(taskId, instanceId));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1);
profile.getSinkDataTime(), 1, 1, auditVersion);
}

private void deleteFromMemory(String instanceId) {
Expand All @@ -403,7 +405,7 @@ private void deleteFromMemory(String instanceId) {
instanceMap.remove(instanceId);
LOGGER.info("delete instance from memory: taskId {} instanceId {}", taskId, instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, inlongGroupId, inlongStreamId,
instance.getProfile().getSinkDataTime(), 1, 1);
instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}

private void addToDb(InstanceProfile profile, boolean addNew) {
Expand All @@ -413,7 +415,7 @@ private void addToDb(InstanceProfile profile, boolean addNew) {
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1);
profile.getSinkDataTime(), 1, 1, auditVersion);
}
}

Expand All @@ -430,7 +432,7 @@ private void addToMemory(InstanceProfile instanceProfile) {
LOGGER.error("old instance {} should not exist, try stop it first",
instanceProfile.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
Expand All @@ -445,12 +447,12 @@ private void addToMemory(InstanceProfile instanceProfile) {
instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(),
EXECUTOR_SERVICE.getActiveCount());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
} else {
LOGGER.error(
"add instance to memory init failed instanceId {}", instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ public abstract class CommonInstance extends Instance {
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();
protected long auditVersion;

@Override
public boolean init(Object srcManager, InstanceProfile srcProfile) {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
auditVersion = Long.parseLong(getTaskId());
setInodeInfo(profile);
LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(),
profile.getInstanceId(), profile.toJsonStr());
Expand Down Expand Up @@ -153,7 +155,7 @@ private void doRun() {
private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(),
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1);
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);
heartbeatcheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ public class SenderManager {
private volatile boolean resendRunning = false;
private volatile boolean started = false;
private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
private long auditVersion;

public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) {
this.profile = profile;
auditVersion = Long.parseLong(profile.getTaskId());
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
proxySend = profile.getBoolean(TASK_PROXY_SEND, DEFAULT_TASK_PROXY_SEND);
totalAsyncBufSize = profile
Expand Down Expand Up @@ -233,10 +235,10 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
AgentSenderCallback cb = new AgentSenderCallback(message, retry);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, message.getGroupId(),
message.getStreamId(), message.getDataTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(),
message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend);
Expand All @@ -246,10 +248,10 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
} catch (Exception exception) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, message.getGroupId(),
message.getStreamId(), message.getDataTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
suc = false;
if (retry > maxSenderRetry) {
if (retry % 10 == 0) {
Expand Down Expand Up @@ -291,10 +293,10 @@ private Runnable flushResendQueue() {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(),
message.getStreamId(), message.getDataTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
sendBatchWithRetryCount(callback.message, callback.retry + 1);
}
} catch (Exception ex) {
Expand Down Expand Up @@ -353,18 +355,18 @@ public void onMessageAck(SendResult result) {
message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId,
dataTime, message.getMsgCnt(), message.getTotalSize());
dataTime, message.getMsgCnt(), message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId, streamId,
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize());
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize(), auditVersion);
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);
getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(msgCnt);
putInResendQueue(new AgentSenderCallback(message, retry));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId, streamId,
dataTime, message.getMsgCnt(), message.getTotalSize());
dataTime, message.getMsgCnt(), message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId, streamId,
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize());
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize(), auditVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,16 @@ private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, in
if (overLen) {
LOGGER.warn("readLines over len finally string len {}",
new String(baos.toByteArray()).length());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize);
long auditTime = 0;
if (isRealTime) {
auditTime = AgentUtils.getCurrentTime();
} else {
auditTime = profile.getSinkDataTime();
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId, inlongStreamId,
auditTime, 1, maxPackSize, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId,
inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion);
}
baos.reset();
overLen = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ protected class SourceData {
protected volatile boolean runnable = true;
protected volatile boolean running = false;
protected String taskId;
protected long auditVersion;
protected String instanceId;
protected InstanceProfile profile;
private ExtendedHandler extendedHandler;
private boolean isRealTime = false;
protected boolean isRealTime = false;
protected volatile long emptyCount = 0;
protected int maxPackSize;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
Expand All @@ -118,6 +119,7 @@ protected class SourceData {
public void init(InstanceProfile profile) {
this.profile = profile;
taskId = profile.getTaskId();
auditVersion = Long.parseLong(taskId);
instanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
Expand Down Expand Up @@ -333,9 +335,9 @@ private Message createMessage(SourceData sourceData) {
auditTime = profile.getSinkDataTime();
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
auditTime, 1, sourceData.getData().length);
auditTime, 1, sourceData.getData().length, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
AgentUtils.getCurrentTime(), 1, sourceData.getData().length);
AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion);
Message finalMsg = new DefaultMessage(sourceData.getData(), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ public abstract class AbstractTask extends Task {
protected volatile boolean running = false;
protected boolean initOK = false;
protected long lastPrintTime = 0;
protected long auditVersion;

@Override
public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException {
taskManager = (TaskManager) srcManager;
this.taskProfile = taskProfile;
this.basicDb = basicDb;
auditVersion = Long.parseLong(taskProfile.getTaskId());
instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
basicDb, taskManager.getTaskDb());
try {
Expand Down Expand Up @@ -132,7 +134,7 @@ protected void doRun() {

protected void taskHeartbeat() {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, taskProfile.getInlongGroupId(),
taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1);
taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);

}

Expand Down
Loading