Skip to content

Commit

Permalink
[split] finagle-core: fix reporting bug for pending reqs
Browse files Browse the repository at this point in the history
RB_ID=133531
  • Loading branch information
sprsquish authored and CI committed Mar 20, 2013
1 parent cf8a063 commit 450001b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,32 @@ class StatsFilter[Req, Rep](statsReceiver: StatsReceiver)
val elapsed = Stopwatch.start()

outstandingRequestCount.incrementAndGet()
val result = service(request)

result respond {
case Throw(BackupRequestLost) =>
// We blackhole this request. It doesn't count for anything.
// After the Failure() patch, this should no longer need to
// be a special case.
outstandingRequestCount.decrementAndGet()
case Throw(e) =>
dispatchCount.incr()
latencyStat.add(elapsed().inMilliseconds)
def flatten(ex: Throwable): Seq[String] =
if (ex eq null) Seq[String]() else ex.getClass.getName +: flatten(ex.getCause)
statsReceiver.scope("failures").counter(flatten(e): _*).incr()
e match {
case sourced: SourcedException if sourced.serviceName != "unspecified" =>
statsReceiver
.scope("sourcedfailures")
.counter(sourced.serviceName +: flatten(sourced): _*)
.incr()
case _ =>
}
case Return(_) =>
dispatchCount.incr()
successCount.incr()
latencyStat.add(elapsed().inMilliseconds)
service(request) respond { response =>
outstandingRequestCount.decrementAndGet()
response match {
case Throw(BackupRequestLost) =>
// We blackhole this request. It doesn't count for anything.
// After the Failure() patch, this should no longer need to
// be a special case.
case Throw(e) =>
dispatchCount.incr()
latencyStat.add(elapsed().inMilliseconds)
def flatten(ex: Throwable): Seq[String] =
if (ex eq null) Seq[String]() else ex.getClass.getName +: flatten(ex.getCause)
statsReceiver.scope("failures").counter(flatten(e): _*).incr()
e match {
case sourced: SourcedException if sourced.serviceName != "unspecified" =>
statsReceiver
.scope("sourcedfailures")
.counter(sourced.serviceName +: flatten(sourced): _*)
.incr()
case _ =>
}
case Return(_) =>
dispatchCount.incr()
successCount.incr()
latencyStat.add(elapsed().inMilliseconds)
}
}

result
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.twitter.finagle.service

import org.specs.SpecificationWithJUnit
import org.specs.mock.Mockito
import com.twitter.util.Promise
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.{RequestException, WriteException, Service}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class StatsFilterTest extends FunSuite {
def getService: (Promise[String], InMemoryStatsReceiver, Service[String, String]) = {
val receiver = new InMemoryStatsReceiver()
val statsFilter = new StatsFilter[String, String](receiver)
val promise = new Promise[String]
val service = new Service[String, String] {
def apply(request: String) = promise
}

(promise, receiver, statsFilter andThen service)
}

test("report exceptions") {
val (promise, receiver, statsService) = getService

val e1 = new Exception("e1")
val e2 = new RequestException(e1)
val e3 = WriteException(e2)
e3.serviceName = "bogus"
promise.setException(e3)
val res = statsService("foo")
assert(res.isDefined)
assert(res.isThrow)
val sourced = receiver.counters.keys.filter { _.exists(_ == "sourcedfailures") }
assert(sourced.size == 1)
assert(sourced.toSeq(0).exists(_.indexOf("bogus") >=0))
val unsourced = receiver.counters.keys.filter { _.exists(_ == "failures") }
assert(unsourced.size == 1)
assert(unsourced.toSeq(0).exists { s => s.indexOf("RequestException") >= 0 })
assert(unsourced.toSeq(0).exists { s => s.indexOf("WriteException") >= 0 })
}

test("report pending requests on success") {
val (promise, receiver, statsService) = getService
assert(receiver.gauges(Seq("pending"))() == 0.0)
statsService("foo")
assert(receiver.gauges(Seq("pending"))() == 1.0)
promise.setValue("")
assert(receiver.gauges(Seq("pending"))() == 0.0)
}

test("report pending requests on failure") {
val (promise, receiver, statsService) = getService
assert(receiver.gauges(Seq("pending"))() == 0.0)
statsService("foo")
assert(receiver.gauges(Seq("pending"))() == 1.0)
promise.setException(new Exception)
assert(receiver.gauges(Seq("pending"))() == 0.0)
}
}

0 comments on commit 450001b

Please sign in to comment.