Skip to content

Commit

Permalink
[INLONG-11620][Audit] Provide an open API for module reconciliation (#…
Browse files Browse the repository at this point in the history
…11640)

Co-authored-by: doleyzi <[email protected]>
  • Loading branch information
doleyzi and doleyzi authored Jan 5, 2025
1 parent 8945a3c commit 303d0c6
Show file tree
Hide file tree
Showing 11 changed files with 653 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.Exception;

public class InvalidRequestException extends Exception {

public InvalidRequestException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class OpenApiConstants {
public static final String DEFAULT_API_GET_IDS_PATH = "/audit/query/getIds";
public static final String KEY_API_GET_AUDIT_PROXY_PATH = "api.get.audit.proxy";
public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH = "/audit/query/getAuditProxy";
public static final String KEY_API_RECONCILIATION_PATH = "api.reconciliation.path";
public static final String DEFAULT_API_RECONCILIATION_PATH = "/audit/query/reconciliation";
public static final String KEY_API_THREAD_POOL_SIZE = "api.thread.pool.size";
public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
Expand All @@ -60,7 +62,7 @@ public class OpenApiConstants {
public static final String PARAMS_AUDIT_CYCLE = "auditCycle";
public static final String KEY_HTTP_BODY_SUCCESS = "success";
public static final String KEY_HTTP_BODY_ERR_MSG = "errMsg";
public static final String KEY_HTTP_BODY_ERR_DATA = "data";
public static final String KEY_HTTP_BODY_DATA = "data";
public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
public static final String VALUE_HTTP_HEADER_CONTENT_TYPE = "application/json;charset=utf-8";
public static final String KEY_HTTP_SERVER_BIND_PORT = "api.http.server.bind.port";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.service.auditor;

import org.apache.inlong.audit.Exception.InvalidRequestException;
import org.apache.inlong.audit.service.cache.AbstractCache;
import org.apache.inlong.audit.service.cache.HalfHourCache;
import org.apache.inlong.audit.service.cache.HourCache;
import org.apache.inlong.audit.service.cache.RealTimeQuery;
import org.apache.inlong.audit.service.cache.TenMinutesCache;
import org.apache.inlong.audit.service.entities.AuditCycle;
import org.apache.inlong.audit.service.entities.StatData;
import org.apache.inlong.audit.service.metric.MetricsManager;
import org.apache.inlong.audit.service.utils.AuditUtils;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_DATA;
import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
import static org.apache.inlong.audit.service.entities.AuditCycle.DAY;
import static org.apache.inlong.audit.service.entities.AuditCycle.HOUR;
import static org.apache.inlong.audit.service.entities.AuditCycle.MINUTE_10;
import static org.apache.inlong.audit.service.entities.AuditCycle.MINUTE_30;

public class Audit {

private static final Logger LOGGER = LoggerFactory.getLogger(Audit.class);
private static final Gson GSON = new Gson();
private static volatile Audit instance = null;

private Audit() {
}

public static Audit getInstance() {
if (instance == null) {
synchronized (Audit.class) {
if (instance == null) {
instance = new Audit();
}
}
}
return instance;
}

public JsonObject getData(String requestInfo) {
try {
RequestInfo request = GSON.fromJson(requestInfo, RequestInfo.class);
AuditCycle auditCycle = AuditUtils.getAuditCycleTime(request.getStartTime(), request.getEndTime());
validateRequest(request, auditCycle);

ReconciliationData data = getAuditData(request, auditCycle);
return createResponseJson(true, data, null);

} catch (InvalidRequestException e) {
LOGGER.error("Invalid request parameters: {}", e.getMessage());
return createResponseJson(false, null, e.getMessage());
} catch (Exception e) {
LOGGER.error("Failed to process reconciliation request", e);
return createResponseJson(false, null,
"Internal server error: " + e.getMessage());
}
}

private ReconciliationData getAuditData(RequestInfo request, AuditCycle auditCycle) {
// First get the data from the cache
ReconciliationData data = getDataFromCache(request, auditCycle);
if (data != null && data.isNotEmpty() && data.getDiffRatio() <= request.getDiffRatio()) {
return data;
}

long statTimeMillis = System.currentTimeMillis();

// Second, query the data from the data storage (without deduplication)
data = getDataFromStorage(request, false);
if (data.getDiffRatio() <= request.getDiffRatio()) {
return data;
}

// Finally, query the data from the data storage (to deduplicate the data)
data = getDataFromStorage(request, true);
MetricsManager.getInstance().addApiMetricNoCache(auditCycle,
System.currentTimeMillis() - statTimeMillis);
LOGGER.info("Get audit data from data storage by distinct. Request info: {}", request);
return data;
}

private void validateRequest(RequestInfo request, AuditCycle auditCycle) throws InvalidRequestException {
if (!areIdsValid(request) || !isAuditCycleValid(auditCycle)) {
throw new InvalidRequestException("Invalid parameters: " + request);
}
setDefaultAuditTagIfBlank(request);
}

private void setDefaultAuditTagIfBlank(RequestInfo request) {
if (StringUtils.isBlank(request.getSrcAuditTag())) {
request.setSrcAuditTag(DEFAULT_AUDIT_TAG);
}
if (StringUtils.isBlank(request.getDestAuditTag())) {
request.setDestAuditTag(DEFAULT_AUDIT_TAG);
}
}

private boolean areIdsValid(RequestInfo request) {
return Objects.nonNull(request.getInlongGroupId()) &&
Objects.nonNull(request.getInlongStreamId()) &&
Objects.nonNull(request.getSrcAuditId()) &&
Objects.nonNull(request.getDestAuditId());
}

private boolean isAuditCycleValid(AuditCycle auditCycle) {
return Arrays.asList(
MINUTE_10,
MINUTE_30,
HOUR,
DAY).contains(auditCycle);
}

private JsonObject createResponseJson(boolean isSuccess, ReconciliationData auditData, String errorMessage) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(KEY_HTTP_BODY_SUCCESS, isSuccess);
jsonObject.addProperty(KEY_HTTP_BODY_ERR_MSG, errorMessage);
jsonObject.add(KEY_HTTP_BODY_DATA,
GSON.toJsonTree(auditData != null ? auditData.getCombinedData() : new LinkedList<>()));
return jsonObject;
}

private ReconciliationData getDataFromCache(RequestInfo request, AuditCycle auditCycle) {
AbstractCache auditCache = getAuditCache(auditCycle);
if (auditCache == null) {
return null;
}
List<StatData> srcData = auditCache.getData(request.getStartTime(), request.getEndTime(),
request.getInlongGroupId(), request.getInlongStreamId(), request.getSrcAuditId(),
request.getSrcAuditTag(), false);
List<StatData> destData = auditCache.getData(request.getStartTime(), request.getEndTime(),
request.getInlongGroupId(), request.getInlongStreamId(), request.getDestAuditId(),
request.getDestAuditTag(), false);
return new ReconciliationData(AuditUtils.mergeStatDataList(srcData), AuditUtils.mergeStatDataList(destData));
}

private ReconciliationData getDataFromStorage(RequestInfo request, boolean needDistinct) {
List<StatData> srcData = RealTimeQuery.getInstance().queryAuditData(request.getStartTime(),
request.getEndTime(), request.getInlongGroupId(),
request.getInlongStreamId(), request.getSrcAuditId(), request.getSrcAuditTag(), needDistinct);
List<StatData> destData = RealTimeQuery.getInstance().queryAuditData(request.getStartTime(),
request.getEndTime(), request.getInlongGroupId(),
request.getInlongStreamId(), request.getDestAuditId(), request.getDestAuditTag(), needDistinct);
return new ReconciliationData(AuditUtils.mergeStatDataList(srcData), AuditUtils.mergeStatDataList(destData));
}

private AbstractCache getAuditCache(AuditCycle auditCycle) {
switch (auditCycle) {
case MINUTE_10:
return TenMinutesCache.getInstance();
case MINUTE_30:
return HalfHourCache.getInstance();
case HOUR:
case DAY:
return HourCache.getInstance();
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.service.auditor;

import org.apache.inlong.audit.service.entities.StatData;
import org.apache.inlong.audit.service.utils.AuditUtils;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;

@Data
@AllArgsConstructor
public class ReconciliationData {

public StatData srcData;
public StatData destData;

public double getDiffRatio() {
if (srcData == null && destData == null) {
return 0;
}
if (srcData == null || destData == null) {
return 1;
}
return AuditUtils.calculateDiffRatio(srcData.getCount(), destData.getCount());
}

public List<StatData> getCombinedData() {
List<StatData> result = new ArrayList<>();
if (srcData != null) {
result.add(srcData);
}
if (destData != null) {
result.add(destData);
}
return result;
}

public boolean isNotEmpty() {
return srcData != null || destData != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.service.auditor;

import lombok.Data;

@Data
public class RequestInfo {

String startTime;
String endTime;
String inlongGroupId;
String inlongStreamId;
String srcAuditId;
String srcAuditTag;
String destAuditId;
String destAuditTag;
double diffRatio;
}
Loading

0 comments on commit 303d0c6

Please sign in to comment.