Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports subpath routing #2026

Merged
merged 12 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 52 additions & 24 deletions core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
bufferMeta.set_shardhashkey(data->mShardHashKey);
bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType()));
bufferMeta.set_telemetrytype(flusher->mTelemetryType);
bufferMeta.set_subpath(flusher->GetSubpath());
#ifdef __ENTERPRISE__
bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode));
#endif
Expand Down Expand Up @@ -866,30 +867,57 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe
} else {
dataType = RawDataType::EVENT_GROUP;
}
if (bufferMeta.has_telemetrytype() && bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
return PostMetricStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
logData,
bufferMeta.rawsize());
} else {
return PostLogStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
dataType,
logData,
bufferMeta.rawsize(),
bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "");

auto telemetryType
= bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS;
switch (telemetryType) {
case sls_logs::SLS_TELEMETRY_TYPE_LOGS:
return PostLogStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
dataType,
logData,
bufferMeta.rawsize(),
bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "");
case sls_logs::SLS_TELEMETRY_TYPE_METRICS:
return PostMetricStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
logData,
bufferMeta.rawsize());
case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS:
case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES:
case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS:
return PostAPMBackendLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
dataType,
logData,
bufferMeta.rawsize(),
bufferMeta.subpath());
default: {
// should not happen
LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen"));
SLSResponse response;
response.mErrorCode = LOGE_REQUEST_ERROR;
response.mErrorMsg = "Unhandled telemetry type";
return response;
}
}
}

Expand Down
155 changes: 101 additions & 54 deletions core/plugin/flusher/sls/FlusherSLS.cpp
KayzzzZ marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,57 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
mContext->GetRegion());
}

// TelemetryType
string telemetryType;
if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
KayzzzZ marked this conversation as resolved.
Show resolved Hide resolved
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (telemetryType == "metrics") {
// TelemetryType set to metrics
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (telemetryType == "arms_agentinfo") {
mSubpath = APM_AGENTINFOS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS;
} else if (telemetryType == "arms_metrics") {
mSubpath = APM_METRICS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS;
} else if (telemetryType == "arms_traces") {
mSubpath = APM_TRACES_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES;
} else if (!telemetryType.empty() && telemetryType != "logs") {
// TelemetryType invalid
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
"string param TelemetryType is not valid",
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

// Logstore
if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
// log and metric
if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}

// Region
Expand Down Expand Up @@ -409,32 +450,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
}
#endif

// TelemetryType
string telemetryType;
if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (telemetryType == "metrics") {
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (!telemetryType.empty() && telemetryType != "logs") {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
"string param TelemetryType is not valid",
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

// Batch
const char* key = "Batch";
Expand Down Expand Up @@ -465,25 +480,17 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
}

// ShardHashKeys
if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (!mShardHashKeys.empty() && mContext->IsExactlyOnceEnabled()) {
mShardHashKeys.clear();
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
"exactly once enabled when ShardHashKeys is not empty",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) {
if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}

DefaultFlushStrategyOptions strategy{
Expand Down Expand Up @@ -667,6 +674,11 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr<HttpSinkRequest>
case sls_logs::SLS_TELEMETRY_TYPE_METRICS:
req = CreatePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, data);
break;
case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS:
case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS:
case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES:
req = CreatePostAPMBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath);
break;
default:
break;
}
Expand Down Expand Up @@ -1245,6 +1257,41 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostMetricStoreLogsRequest(const s
1);
}

unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item,
const std::string& subPath) const {
string query;
map<string, string> header;
PreparePostAPMBackendRequest(accessKeyId,
accessKeySecret,
type,
item->mCurrentHost,
item->mRealIpFlag,
mProject,
item->mLogstore,
CompressTypeToString(mCompressor->GetCompressType()),
item->mType,
item->mData,
item->mRawSize,
mSubpath,
query,
header);
bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion);
return make_unique<HttpSinkRequest>(HTTP_POST,
httpsFlag,
item->mCurrentHost,
httpsFlag ? 443 : 80,
subPath,
"",
header,
item->mData,
item,
INT32_FLAG(default_http_request_timeout_sec),
1);
}

sls_logs::SlsCompressType ConvertCompressType(CompressType type) {
sls_logs::SlsCompressType compressType = sls_logs::SLS_CMP_NONE;
switch (type) {
Expand Down
9 changes: 9 additions & 0 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class FlusherSLS : public HttpFlusher {
// for use of Go pipeline and shennong
bool Send(std::string&& data, const std::string& shardHashKey, const std::string& logstore = "");

std::string GetSubpath() const { return mSubpath; }

std::string mProject;
std::string mLogstore;
std::string mRegion;
Expand Down Expand Up @@ -130,6 +132,13 @@ class FlusherSLS : public HttpFlusher {
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item) const;
std::unique_ptr<HttpSinkRequest> CreatePostAPMBackendRequest(const std::string& accessKeyId,
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item,
const std::string& subPath) const;

std::string mSubpath;

Batcher<SLSEventBatchStatus> mBatcher;
std::unique_ptr<EventGroupSerializer> mGroupSerializer;
Expand Down
79 changes: 79 additions & 0 deletions core/plugin/flusher/sls/SLSClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,50 @@ void PreparePostMetricStoreLogsRequest(const string& accessKeyId,
header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
}

void PreparePostAPMBackendRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
const string& host,
bool isHostIp,
const string& project,
const string& logstore,
const string& compressType,
RawDataType dataType,
const string& body,
size_t rawSize,
const string& path,
string& query,
map<string, string>& header) {
if (isHostIp) {
header[HOST] = project + "." + host;
} else {
header[HOST] = host;
}
header[USER_AGENT] = SLSClientManager::GetInstance()->GetUserAgent();
header[DATE] = GetDateString();
header[CONTENT_TYPE] = TYPE_LOG_PROTOBUF;
header[CONTENT_LENGTH] = to_string(body.size());
header[CONTENT_MD5] = CalcMD5(body);
header[X_LOG_APIVERSION] = LOG_API_VERSION;
header[X_LOG_SIGNATUREMETHOD] = HMAC_SHA1;
if (!compressType.empty()) {
header[X_LOG_COMPRESSTYPE] = compressType;
}
if (dataType == RawDataType::EVENT_GROUP) {
header[X_LOG_BODYRAWSIZE] = to_string(rawSize);
} else {
header[X_LOG_BODYRAWSIZE] = to_string(body.size());
header[X_LOG_MODE] = LOG_MODE_BATCH_GROUP;
}
if (type == SLSClientManager::AuthType::ANONYMOUS) {
header[X_LOG_KEYPROVIDER] = MD5_SHA1_SALT_KEYPROVIDER;
}

map<string, string> parameterList;
string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret);
header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
}

SLSResponse PostLogStoreLogs(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
Expand Down Expand Up @@ -303,6 +347,41 @@ SLSResponse PostMetricStoreLogs(const string& accessKeyId,
return ParseHttpResponse(response);
}

SLSResponse PostAPMBackendLogs(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
const string& host,
bool httpsFlag,
const string& project,
const string& logstore,
const string& compressType,
RawDataType dataType,
const string& body,
size_t rawSize,
const std::string& subpath) {
string query;
map<string, string> header;
PreparePostAPMBackendRequest(accessKeyId,
accessKeySecret,
type,
host,
false, // sync request always uses vip
project,
logstore,
compressType,
dataType,
body,
rawSize,
subpath,
query,
header);
HttpResponse response;
SendHttpRequest(
make_unique<HttpRequest>(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https也需要测试下

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我手动测试过商业版和开源版,上报都是没问题的。HTTPS 这个也是要加到 e2e 测试里面吗

response);
return ParseHttpResponse(response);
}

SLSResponse PutWebTracking(const string& host,
bool httpsFlag,
const string& logstore,
Expand Down
Loading
Loading