Skip to content

Commit

Permalink
do not separate event group when group size excceeds min batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Nov 1, 2024
1 parent 3d56d3b commit ebeb0f9
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 149 deletions.
8 changes: 4 additions & 4 deletions core/pipeline/batch/BatchItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class GroupBatchItem {

void Add(BatchedEvents&& g, int64_t totalEnqueTimeMs) {
mEventsCnt += g.mEvents.size();
mTotalEnqueTimeMs += totalEnqueTimeMs;
// mTotalEnqueTimeMs += totalEnqueTimeMs;
mGroups.emplace_back(std::move(g));
mStatus.Update(mGroups.back());
}
Expand Down Expand Up @@ -94,9 +94,9 @@ class EventBatchItem {
void Add(PipelineEventPtr&& e) {
mBatch.mEvents.emplace_back(std::move(e));
mStatus.Update(mBatch.mEvents.back());
mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count();
// mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
// .time_since_epoch()
// .count();
}

void Flush(GroupBatchItem& res) {
Expand Down
159 changes: 94 additions & 65 deletions core/pipeline/batch/Batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,25 @@ class Batcher {
std::string errorMsg;
PipelineContext& ctx = flusher->GetContext();

uint32_t maxSizeBytes = strategy.mMaxSizeBytes;
if (!GetOptionalUIntParam(config, "MaxSizeBytes", maxSizeBytes, errorMsg)) {
uint32_t minSizeBytes = strategy.mMinSizeBytes;
if (!GetOptionalUIntParam(config, "MinSizeBytes", minSizeBytes, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
errorMsg,
maxSizeBytes,
minSizeBytes,
flusher->Name(),
ctx.GetConfigName(),
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
}

uint32_t maxCnt = strategy.mMaxCnt;
if (!GetOptionalUIntParam(config, "MaxCnt", maxCnt, errorMsg)) {
uint32_t minCnt = strategy.mMinCnt;
if (!GetOptionalUIntParam(config, "MinCnt", minCnt, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
errorMsg,
maxCnt,
minCnt,
flusher->Name(),
ctx.GetConfigName(),
ctx.GetProjectName(),
Expand All @@ -88,14 +88,15 @@ class Batcher {

if (enableGroupBatch) {
uint32_t groupTimeout = timeoutSecs / 2;
mGroupFlushStrategy = GroupFlushStrategy(maxSizeBytes, groupTimeout);
mGroupFlushStrategy = GroupFlushStrategy(minSizeBytes, groupTimeout);
mGroupQueue = GroupBatchItem();
mEventFlushStrategy.SetTimeoutSecs(timeoutSecs - groupTimeout);
} else {
mEventFlushStrategy.SetTimeoutSecs(timeoutSecs);
}
mEventFlushStrategy.SetMaxSizeBytes(maxSizeBytes);
mEventFlushStrategy.SetMaxCnt(maxCnt);
mEventFlushStrategy.SetMaxSizeBytes(strategy.mMaxSizeBytes);
mEventFlushStrategy.SetMinSizeBytes(minSizeBytes);
mEventFlushStrategy.SetMinCnt(minCnt);

mFlusher = flusher;

Expand All @@ -114,7 +115,7 @@ class Batcher {
mInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_TOTAL);
mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
mOutEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_EVENTS_TOTAL);
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
// mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
mEventBatchItemsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_EVENT_BATCHES_TOTAL);
mBufferedGroupsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_TOTAL);
mBufferedEventsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_TOTAL);
Expand All @@ -134,51 +135,79 @@ class Batcher {
mInGroupDataSizeBytes->Add(g.DataSize());
mEventBatchItemsTotal->Set(mEventQueueMap.size());

size_t eventsSize = g.GetEvents().size();
for (size_t i = 0; i < eventsSize; ++i) {
PipelineEventPtr& e = g.MutableEvents()[i];
if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) {
if (!mGroupQueue) {
UpdateMetricsOnFlushingEventQueue(item);
if (g.DataSize() > mEventFlushStrategy.GetMinSizeBytes()) {
// for group size larger than min batch size, separate group only if size is larger than max batch size
if (!item.IsEmpty()) {
UpdateMetricsOnFlushingEventQueue(item);
item.Flush(res);
}
for (auto& e : g.MutableEvents()) {
// should consider time condition here because sls require this
if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) {
mOutEventsTotal->Add(item.EventSize());
item.Flush(res);
}
if (item.IsEmpty()) {
item.Reset(g.GetSizedTags(),
g.GetSourceBuffer(),
g.GetExactlyOnceCheckpoint(),
g.GetMetadata(EventGroupMetaKey::SOURCE_ID));
}
item.Add(std::move(e));
if (mEventFlushStrategy.SizeReachingUpperLimit(item.GetStatus())) {
mOutEventsTotal->Add(item.EventSize());
item.Flush(res);
} else {
if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) {
UpdateMetricsOnFlushingGroupQueue();
mGroupQueue->Flush(res);
}
if (mGroupQueue->IsEmpty()) {
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
0,
0,
mGroupFlushStrategy->GetTimeoutSecs(),
mFlusher);
}
item.Flush(mGroupQueue.value());
if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) {
UpdateMetricsOnFlushingGroupQueue();
mGroupQueue->Flush(res);
}
}
}
if (item.IsEmpty()) {
item.Reset(g.GetSizedTags(),
g.GetSourceBuffer(),
g.GetExactlyOnceCheckpoint(),
g.GetMetadata(EventGroupMetaKey::SOURCE_ID));
TimeoutFlushManager::GetInstance()->UpdateRecord(
mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher);
mBufferedGroupsTotal->Add(1);
mBufferedDataSizeByte->Add(item.DataSize());
} else if (i == 0) {
item.AddSourceBuffer(g.GetSourceBuffer());
}
mBufferedEventsTotal->Add(1);
mBufferedDataSizeByte->Add(e->DataSize());
item.Add(std::move(e));
if (mEventFlushStrategy.NeedFlushBySize(item.GetStatus())
|| mEventFlushStrategy.NeedFlushByCnt(item.GetStatus())) {
UpdateMetricsOnFlushingEventQueue(item);
item.Flush(res);
mOutEventsTotal->Add(item.EventSize());
item.Flush(res);
} else {
size_t eventsSize = g.GetEvents().size();
for (size_t i = 0; i < eventsSize; ++i) {
PipelineEventPtr& e = g.MutableEvents()[i];
if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) {
if (!mGroupQueue) {
UpdateMetricsOnFlushingEventQueue(item);
item.Flush(res);
} else {
if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) {
UpdateMetricsOnFlushingGroupQueue();
mGroupQueue->Flush(res);
}
if (mGroupQueue->IsEmpty()) {
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
0,
0,
mGroupFlushStrategy->GetTimeoutSecs(),
mFlusher);
}
item.Flush(mGroupQueue.value());
if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) {
UpdateMetricsOnFlushingGroupQueue();
mGroupQueue->Flush(res);
}
}
}
if (item.IsEmpty()) {
item.Reset(g.GetSizedTags(),
g.GetSourceBuffer(),
g.GetExactlyOnceCheckpoint(),
g.GetMetadata(EventGroupMetaKey::SOURCE_ID));
TimeoutFlushManager::GetInstance()->UpdateRecord(
mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher);
mBufferedGroupsTotal->Add(1);
mBufferedDataSizeByte->Add(item.DataSize());
} else if (i == 0) {
item.AddSourceBuffer(g.GetSourceBuffer());
}
mBufferedEventsTotal->Add(1);
mBufferedDataSizeByte->Add(e->DataSize());
item.Add(std::move(e));
if (mEventFlushStrategy.NeedFlushBySize(item.GetStatus())
|| mEventFlushStrategy.NeedFlushByCnt(item.GetStatus())) {
UpdateMetricsOnFlushingEventQueue(item);
item.Flush(res);
}
}
}
mTotalAddTimeMs->Add(std::chrono::system_clock::now() - before);
Expand Down Expand Up @@ -260,25 +289,25 @@ class Batcher {
private:
void UpdateMetricsOnFlushingEventQueue(const EventBatchItem<T>& item) {
mOutEventsTotal->Add(item.EventSize());
mTotalDelayMs->Add(
item.EventSize()
* std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count()
- item.TotalEnqueTimeMs());
// mTotalDelayMs->Add(
// item.EventSize()
// * std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
// .time_since_epoch()
// .count()
// - item.TotalEnqueTimeMs());
mBufferedGroupsTotal->Sub(1);
mBufferedEventsTotal->Sub(item.EventSize());
mBufferedDataSizeByte->Sub(item.DataSize());
}

void UpdateMetricsOnFlushingGroupQueue() {
mOutEventsTotal->Add(mGroupQueue->EventSize());
mTotalDelayMs->Add(
mGroupQueue->EventSize()
* std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count()
- mGroupQueue->TotalEnqueTimeMs());
// mTotalDelayMs->Add(
// mGroupQueue->EventSize()
// * std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
// .time_since_epoch()
// .count()
// - mGroupQueue->TotalEnqueTimeMs());
mBufferedGroupsTotal->Sub(mGroupQueue->GroupSize());
mBufferedEventsTotal->Sub(mGroupQueue->EventSize());
mBufferedDataSizeByte->Sub(mGroupQueue->DataSize());
Expand All @@ -297,7 +326,7 @@ class Batcher {
CounterPtr mInEventsTotal;
CounterPtr mInGroupDataSizeBytes;
CounterPtr mOutEventsTotal;
CounterPtr mTotalDelayMs;
// CounterPtr mTotalDelayMs;
IntGaugePtr mEventBatchItemsTotal;
IntGaugePtr mBufferedGroupsTotal;
IntGaugePtr mBufferedEventsTotal;
Expand Down
34 changes: 20 additions & 14 deletions core/pipeline/batch/FlushStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,71 @@

#include <cstdint>
#include <ctime>
#include <limits>

#include "pipeline/batch/BatchStatus.h"
#include "models/PipelineEventPtr.h"
#include "pipeline/batch/BatchStatus.h"

namespace logtail {

struct DefaultFlushStrategyOptions {
uint32_t mMaxSizeBytes = 0;
uint32_t mMaxCnt = 0;
uint32_t mMaxSizeBytes = std::numeric_limits<uint32_t>::max();
uint32_t mMinSizeBytes = 0;
uint32_t mMinCnt = 0;
uint32_t mTimeoutSecs = 0;
};

template <class T = EventBatchStatus>
class EventFlushStrategy {
public:
void SetMaxSizeBytes(uint32_t size) { mMaxSizeBytes = size; }
void SetMaxCnt(uint32_t cnt) { mMaxCnt = cnt; }
void SetMinSizeBytes(uint32_t size) { mMinSizeBytes = size; }
void SetMinCnt(uint32_t cnt) { mMinCnt = cnt; }
void SetTimeoutSecs(uint32_t secs) { mTimeoutSecs = secs; }
uint32_t GetMaxSizeBytes() const { return mMaxSizeBytes; }
uint32_t GetMaxCnt() const { return mMaxCnt; }
uint32_t GetMinSizeBytes() const { return mMinSizeBytes; }
uint32_t GetMinCnt() const { return mMinCnt; }
uint32_t GetTimeoutSecs() const { return mTimeoutSecs; }

// should be called after event is added
bool NeedFlushBySize(const T& status) { return status.GetSize() >= mMaxSizeBytes; }
bool NeedFlushByCnt(const T& status) { return status.GetCnt() == mMaxCnt; }
bool NeedFlushBySize(const T& status) { return status.GetSize() >= mMinSizeBytes; }
bool NeedFlushByCnt(const T& status) { return status.GetCnt() == mMinCnt; }
// should be called before event is added
bool NeedFlushByTime(const T& status, const PipelineEventPtr& e) {
return time(nullptr) - status.GetCreateTime() >= mTimeoutSecs;
}
bool SizeReachingUpperLimit(const T& status) { return status.GetSize() >= mMaxSizeBytes; }

private:
uint32_t mMaxSizeBytes = 0;
uint32_t mMaxCnt = 0;
uint32_t mMinSizeBytes = 0;
uint32_t mMinCnt = 0;
uint32_t mTimeoutSecs = 0;
};

class GroupFlushStrategy {
public:
GroupFlushStrategy(uint32_t size, uint32_t timeout) : mMaxSizeBytes(size), mTimeoutSecs(timeout) {}
GroupFlushStrategy(uint32_t size, uint32_t timeout) : mMinSizeBytes(size), mTimeoutSecs(timeout) {}

void SetMaxSizeBytes(uint32_t size) { mMaxSizeBytes = size; }
void SetMinSizeBytes(uint32_t size) { mMinSizeBytes = size; }
void SetTimeoutSecs(uint32_t secs) { mTimeoutSecs = secs; }
uint32_t GetMaxSizeBytes() const { return mMaxSizeBytes; }
uint32_t GetMinSizeBytes() const { return mMinSizeBytes; }
uint32_t GetTimeoutSecs() const { return mTimeoutSecs; }

// should be called after event is added
bool NeedFlushBySize(const GroupBatchStatus& status) { return status.GetSize() >= mMaxSizeBytes; }
bool NeedFlushBySize(const GroupBatchStatus& status) { return status.GetSize() >= mMinSizeBytes; }
// should be called before event is added
bool NeedFlushByTime(const GroupBatchStatus& status) {
return time(nullptr) - status.GetCreateTime() >= mTimeoutSecs;
}

private:
uint32_t mMaxSizeBytes = 0;
uint32_t mMinSizeBytes = 0;
uint32_t mTimeoutSecs = 0;
};

template <>
bool EventFlushStrategy<SLSEventBatchStatus>::NeedFlushByTime(const SLSEventBatchStatus& status,
const PipelineEventPtr& e);
const PipelineEventPtr& e);

} // namespace logtail
23 changes: 7 additions & 16 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ DEFINE_FLAG_INT32(profile_data_send_retrytimes, "how many times should retry if
DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5);
DEFINE_FLAG_BOOL(global_network_success, "global network success flag, default false", false);
DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true);
DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024);

DECLARE_FLAG_BOOL(send_prefer_real_ip);

Expand Down Expand Up @@ -374,8 +375,10 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
#endif
mEndpoint = TrimString(mEndpoint);
if (!mEndpoint.empty()) {
SLSClientManager::GetInstance()->AddEndpointEntry(
mRegion, StandardizeEndpoint(mEndpoint, mEndpoint), false, SLSClientManager::EndpointSourceType::LOCAL);
SLSClientManager::GetInstance()->AddEndpointEntry(mRegion,
StandardizeEndpoint(mEndpoint, mEndpoint),
false,
SLSClientManager::EndpointSourceType::LOCAL);
}
}
#ifdef __ENTERPRISE__
Expand Down Expand Up @@ -472,7 +475,8 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
mContext->GetRegion());
}

DefaultFlushStrategyOptions strategy{static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
DefaultFlushStrategyOptions strategy{static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size)),
static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
static_cast<uint32_t>(INT32_FLAG(batch_send_interval))};
if (!mBatcher.Init(
Expand Down Expand Up @@ -516,19 +520,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
mMaxSendRate);
}

// (Deprecated) FlowControlExpireTime
if (!GetOptionalUIntParam(config, "FlowControlExpireTime", mFlowControlExpireTime, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mFlowControlExpireTime,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

GenerateGoPlugin(config, optionalGoPipeline);

mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL);
Expand Down
1 change: 0 additions & 1 deletion core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class FlusherSLS : public HttpFlusher {
sls_logs::SlsTelemetryType mTelemetryType = sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS;
std::vector<std::string> mShardHashKeys;
uint32_t mMaxSendRate = 0; // preserved only for exactly once
uint32_t mFlowControlExpireTime = 0;

// TODO: temporarily public for profile
std::unique_ptr<Compressor> mCompressor;
Expand Down
Loading

0 comments on commit ebeb0f9

Please sign in to comment.