Skip to content

Commit

Permalink
support custom protobuf serializer for logGroup to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Oct 31, 2024
1 parent 095d89c commit f13a372
Show file tree
Hide file tree
Showing 10 changed files with 521 additions and 1,497 deletions.
164 changes: 91 additions & 73 deletions core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,10 @@
#include "common/TimeUtil.h"
#include "common/compression/CompressType.h"
#include "plugin/flusher/sls/FlusherSLS.h"

#include "protobuf/sls/LogGroupSerializer.h"

DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024);

const std::string METRIC_RESERVED_KEY_NAME = "__name__";
const std::string METRIC_RESERVED_KEY_LABELS = "__labels__";
const std::string METRIC_RESERVED_KEY_VALUE = "__value__";
const std::string METRIC_RESERVED_KEY_TIME_NANO = "__time_nano__";

const std::string METRIC_LABELS_SEPARATOR = "|";
const std::string METRIC_LABELS_KEY_VALUE_SEPARATOR = "#$#";

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -61,88 +53,114 @@ bool Serializer<vector<CompressedLogGroup>>::DoSerialize(vector<CompressedLogGro
}

bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
sls_logs::LogGroup logGroup;
for (const auto& e : group.mEvents) {
if (e.Is<LogEvent>()) {
const auto& logEvent = e.Cast<LogEvent>();
auto log = logGroup.add_logs();
if (group.mEvents.empty()) {
errorMsg = "empty event group";
return false;
}

PipelineEvent::Type eventType = group.mEvents[0]->GetType();
if (eventType == PipelineEvent::Type::NONE) {
errorMsg = "unsupported event type in event group";
return false;
}

bool enableNs = mFlusher->GetContext().GetGlobalConfig().mEnableTimestampNanosecond;

size_t logGroupSZ = 0;
vector<size_t> logSZ(group.mEvents.size());
vector<pair<string, size_t>> metricEventContentSZ(group.mEvents.size());
switch (eventType) {
case PipelineEvent::Type::LOG:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
size_t contentSZ = 0;
for (const auto& kv : e) {
contentSZ += GetLogContentSize(kv.first.size(), kv.second.size());
}
logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
}
break;
case PipelineEvent::Type::METRIC:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.Is<std::monostate>()) {
continue;
} else if (e.Is<UntypedSingleValue>()) {
metricEventContentSZ[i].first = to_string(e.GetValue<UntypedSingleValue>()->mValue);
}
metricEventContentSZ[i].second = GetMetricLabelSize(e);

size_t contentSZ = 0;
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_NAME.size(), e.GetName().size());
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_VALUE.size(), metricEventContentSZ[i].first.size());
contentSZ
+= GetLogContentSize(METRIC_RESERVED_KEY_TIME_NANO.size(), e.GetTimestampNanosecond() ? 19U : 10U);
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_LABELS.size(), metricEventContentSZ[i].second);
logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
}
break;
case PipelineEvent::Type::SPAN:
break;
default:
break;
}
// loggroup.category is deprecated, no need to set
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC || tag.first == LOG_RESERVED_KEY_SOURCE
|| tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
logGroupSZ += GetStringSize(tag.second.size());
} else {
logGroupSZ += GetLogTagSize(tag.first.size(), tag.second.size());
}
}

if (static_cast<int32_t>(logGroupSZ) > INT32_FLAG(max_send_log_group_size)) {
errorMsg = "log group exceeds size limit\tgroup size: " + ToString(logGroupSZ)
+ "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
return false;
}

static LogGroupSerializer logGroup;
logGroup.PrepareToSerialize(logGroupSZ);
for (size_t i = 0; i < group.mEvents.size(); ++i) {
if (group.mEvents[i].Is<LogEvent>()) {
const auto& logEvent = group.mEvents[i].Cast<LogEvent>();
logGroup.StartToAddLog(logSZ[i]);
logGroup.AddLogTime(logEvent.GetTimestamp());
for (const auto& kv : logEvent) {
auto contPtr = log->add_contents();
contPtr->set_key(kv.first.to_string());
contPtr->set_value(kv.second.to_string());
logGroup.AddLogContent(kv.first, kv.second);
}
log->set_time(logEvent.GetTimestamp());
if (mFlusher->GetContext().GetGlobalConfig().mEnableTimestampNanosecond
&& logEvent.GetTimestampNanosecond()) {
log->set_time_ns(logEvent.GetTimestampNanosecond().value());
if (enableNs && logEvent.GetTimestampNanosecond()) {
logGroup.AddLogTimeNs(logEvent.GetTimestampNanosecond().value());
}
} else if (e.Is<MetricEvent>()) {
const auto& metricEvent = e.Cast<MetricEvent>();
} else if (group.mEvents[i].Is<MetricEvent>()) {
const auto& metricEvent = group.mEvents[i].Cast<MetricEvent>();
if (metricEvent.Is<std::monostate>()) {
continue;
}
auto log = logGroup.add_logs();
std::ostringstream oss;
// set __labels__
bool hasPrev = false;
for (auto it = metricEvent.TagsBegin(); it != metricEvent.TagsEnd(); ++it) {
if (hasPrev) {
oss << METRIC_LABELS_SEPARATOR;
}
hasPrev = true;
oss << it->first << METRIC_LABELS_KEY_VALUE_SEPARATOR << it->second;
}
auto logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_LABELS);
logPtr->set_value(oss.str());
// set time, no need to set nanosecond for metric
log->set_time(metricEvent.GetTimestamp());
// set __time_nano__
logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_TIME_NANO);
if (metricEvent.GetTimestampNanosecond()) {
logPtr->set_value(std::to_string(metricEvent.GetTimestamp())
+ NumberToDigitString(metricEvent.GetTimestampNanosecond().value(), 9));
} else {
logPtr->set_value(std::to_string(metricEvent.GetTimestamp()));
}
// set __value__
if (metricEvent.Is<UntypedSingleValue>()) {
double value = metricEvent.GetValue<UntypedSingleValue>()->mValue;
logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_VALUE);
logPtr->set_value(std::to_string(value));
}
// set __name__
logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_NAME);
logPtr->set_value(metricEvent.GetName().to_string());
logGroup.StartToAddLog(logSZ[i]);
logGroup.AddLogTime(metricEvent.GetTimestamp());
logGroup.AddLogContentMetricLabel(metricEvent, metricEventContentSZ[i].second);
logGroup.AddLogContentMetricTimeNano(metricEvent);
logGroup.AddLogContent(METRIC_RESERVED_KEY_VALUE, metricEventContentSZ[i].first);
logGroup.AddLogContent(METRIC_RESERVED_KEY_NAME, metricEvent.GetName());
} else {
errorMsg = "unsupported event type in event group";
return false;
}
}
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC) {
logGroup.set_topic(tag.second.to_string());
logGroup.AddTopic(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_SOURCE) {
logGroup.set_source(tag.second.to_string());
logGroup.AddSource(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
logGroup.set_machineuuid(tag.second.to_string());
logGroup.AddMachineUUID(tag.second);
} else {
auto logTag = logGroup.add_logtags();
logTag->set_key(tag.first.to_string());
logTag->set_value(tag.second.to_string());
logGroup.AddLogTag(tag.first, tag.second);
}
}
// loggroup.category is deprecated, no need to set
size_t size = logGroup.ByteSizeLong();
if (static_cast<int32_t>(size) > INT32_FLAG(max_send_log_group_size)) {
errorMsg = "log group exceeds size limit\tgroup size: " + ToString(size)
+ "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
return false;
}
logGroup.SerializeToString(&res);
res = std::move(logGroup.GetResult());
return true;
}

Expand Down
Loading

0 comments on commit f13a372

Please sign in to comment.