Skip to content

Commit

Permalink
only enable control
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 8, 2024
1 parent 5029d4c commit 25224a0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
labels: Map[String, String],
gauge: Gauge[T],
isAppMetrics: Boolean): Unit = {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
}
}
}

Expand Down Expand Up @@ -199,14 +201,16 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])

def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(
name,
metricRegistry.counter(metricNameWithLabel),
labels ++ staticLabels,
isAppMetrics))
if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(
name,
metricRegistry.counter(metricNameWithLabel),
labels ++ staticLabels,
isAppMetrics))
}
}

def counters(): List[NamedCounter] = {
Expand Down Expand Up @@ -483,19 +487,12 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
override def getMetrics(): String = {
var leftMetricsNum = metricsCapacity
val sb = new mutable.StringBuilder
val appMetricsSnapshot = ArrayBuffer[String]()
leftMetricsNum =
fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb, appMetricsSnapshot)
if (leftMetricsNum > 0 && metricsAppEnabled) {
appMetricsSnapshot.toList.take(leftMetricsNum).foreach { appMetrics =>
sb.append(appMetrics)
}
}
leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb)
if (leftMetricsNum <= 0) {
logWarning(
s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum")
Expand All @@ -506,62 +503,30 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
private def fillInnerMetricsSnapshot(
metricList: List[AnyRef],
leftNum: Int,
sb: mutable.StringBuilder,
appMetricsSnapshot: ArrayBuffer[String]): Int = {
sb: mutable.StringBuilder): Int = {
if (leftNum <= 0) {
return 0
}
var nonAppMetricsAddNum = 0
val appCount0Metrics = ArrayBuffer[String]()
for (m <- metricList if nonAppMetricsAddNum < leftNum) {
var strMetrics = ""
var isApp = false
var isCount0 = false
m match {
case c: NamedCounter =>
strMetrics = getCounterMetrics(c)
if (c.isApp) {
isApp = true
if (c.counter.getCount <= 0) {
isCount0 = true
}
}
case g: NamedGauge[_] =>
strMetrics = getGaugeMetrics(g)
if (g.isApp) {
isApp = true
}
case m: NamedMeter =>
strMetrics = getMeterMetrics(m)
case h: NamedHistogram =>
strMetrics = getHistogramMetrics(h)
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
strMetrics = getTimerMetrics(t)
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
strMetrics = s.toString
}
if (!isApp) {
sb.append(strMetrics)
nonAppMetricsAddNum = nonAppMetricsAddNum + 1
} else {
if (leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size > 0) {
if (isCount0) {
appCount0Metrics += strMetrics
} else {
appMetricsSnapshot += strMetrics
}
}
}
}
val leftAppMetricsNum = leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size
if (appCount0Metrics.nonEmpty && leftAppMetricsNum > 0) {
appMetricsSnapshot ++= appCount0Metrics.toList.take(leftAppMetricsNum)
val addList = metricList.take(leftNum)
addList.foreach {
case c: NamedCounter =>
sb.append(getCounterMetrics(c))
case g: NamedGauge[_] =>
sb.append(getGaugeMetrics(g))
case m: NamedMeter =>
sb.append(getMeterMetrics(m))
case h: NamedHistogram =>
sb.append(getHistogramMetrics(h))
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
sb.append(getTimerMetrics(t))
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
sb.append(s.toString)
}
leftNum - nonAppMetricsAddNum
leftNum - addList.size
}

override def destroy(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CelebornSourceSuite extends CelebornFunSuite {

// metrics won't contain appMetrics
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false")
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
conf.set(CelebornConf.METRICS_CAPACITY.key, "7")
val (res1, exps1) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6).foreach { i =>
assert(res1.contains(exps1(i)))
Expand All @@ -121,27 +121,11 @@ class CelebornSourceSuite extends CelebornFunSuite {
assert(!res1.contains(exps1(i)))
}

// app metrics will fall behind when it reaches capacity
// metrics contain appMetrics
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "4")
conf.set(CelebornConf.METRICS_CAPACITY.key, "7")
val (res2, exps2) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6).foreach { i =>
assert(res2.contains(exps2(i)))
}
List[Int](1, 2, 3).foreach { i =>
assert(!res2.contains(exps2(i)))
}

// app metrics count0 will fall behind
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val (res3, exps3) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6, 1, 2).foreach { i =>
assert(res3.contains(exps3(i)))
}
List[Int](3).foreach { i =>
assert(!res3.contains(exps3(i)))
}
checkMetricsRes(res2, exps2)
}

test("test getAndClearTimerMetrics in timerMetrics") {
Expand Down

0 comments on commit 25224a0

Please sign in to comment.