Skip to content

Commit

Permalink
Apply scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Mar 5, 2019
1 parent 3971fae commit d002791
Show file tree
Hide file tree
Showing 47 changed files with 1,052 additions and 880 deletions.
11 changes: 11 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version = 1.4.0

style = defaultWithAlign

align.tokens = [off]
danglingParentheses = true
docstrings = JavaDoc
indentOperator = spray
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
unindentTopLevelOperators = true
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
lazy val `akka-stream-contrib` = (project in file(".")).
aggregate(contrib)
lazy val `akka-stream-contrib` = (project in file(".")).aggregate(contrib)

lazy val contrib = project

Expand Down
9 changes: 4 additions & 5 deletions contrib/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
lazy val contrib = (project in file(".")).
enablePlugins(AutomateHeaderPlugin)
lazy val contrib = (project in file(".")).enablePlugins(AutomateHeaderPlugin)

name := "akka-stream-contrib"

Expand All @@ -10,7 +9,7 @@ testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-F", "4")

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-testkit" % Common.AkkaVersion % "provided",
"junit" % "junit" % "4.12" % Test, // Common Public License 1.0
"com.novocode" % "junit-interface" % "0.11" % Test, // BSD-like
"com.google.jimfs" % "jimfs" % "1.1" % Test // ApacheV2
"junit" % "junit" % "4.12" % Test, // Common Public License 1.0
"com.novocode" % "junit-interface" % "0.11" % Test, // BSD-like
"com.google.jimfs" % "jimfs" % "1.1" % Test // ApacheV2
)
4 changes: 2 additions & 2 deletions contrib/src/main/scala/akka/stream/contrib/Accumulate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.stream.contrib

import akka.japi.function
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

/**
* This companion defines a factory for [[Accumulate]] instances, see [[Accumulate.apply]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.stream.contrib

import akka.japi.function
import akka.stream.stage.{ GraphStage, TimerGraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage.{GraphStage, InHandler, OutHandler, TimerGraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
Expand All @@ -23,10 +23,9 @@ object AccumulateWhileUnchanged {
* @tparam Property type of the observed property
* @return [[AccumulateWhileUnchanged]] instance
*/
def apply[Element, Property](
propertyExtractor: Element => Property,
maxElements: Option[Int] = None,
maxDuration: Option[FiniteDuration] = None) =
def apply[Element, Property](propertyExtractor: Element => Property,
maxElements: Option[Int] = None,
maxDuration: Option[FiniteDuration] = None) =
new AccumulateWhileUnchanged(propertyExtractor, maxElements, maxDuration)

/**
Expand All @@ -39,10 +38,9 @@ object AccumulateWhileUnchanged {
* @tparam Property type of the observed property
* @return [[AccumulateWhileUnchanged]] instance
*/
def create[Element, Property](
propertyExtractor: function.Function[Element, Property],
maxElements: Option[Int] = None,
maxDuration: Option[FiniteDuration] = None) =
def create[Element, Property](propertyExtractor: function.Function[Element, Property],
maxElements: Option[Int] = None,
maxDuration: Option[FiniteDuration] = None) =
new AccumulateWhileUnchanged(propertyExtractor.apply, maxElements, maxDuration)
}

Expand All @@ -56,11 +54,10 @@ object AccumulateWhileUnchanged {
* @tparam Element type of accumulated elements
* @tparam Property type of the observed property
*/
final class AccumulateWhileUnchanged[Element, Property](
propertyExtractor: Element => Property,
maxElements: Option[Int] = None,
maxDuration: Option[FiniteDuration] = None)
extends GraphStage[FlowShape[Element, immutable.Seq[Element]]] {
final class AccumulateWhileUnchanged[Element, Property](propertyExtractor: Element => Property,
maxElements: Option[Int] = None,
maxDuration: Option[FiniteDuration] = None)
extends GraphStage[FlowShape[Element, immutable.Seq[Element]]] {

val in = Inlet[Element]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[Element]]("AccumulateWhileUnchanged.out")
Expand All @@ -73,60 +70,59 @@ final class AccumulateWhileUnchanged[Element, Property](
private var nbElements: Int = 0
private val buffer = Vector.newBuilder[Element]

setHandlers(in, out, new InHandler with OutHandler {
setHandlers(
in,
out,
new InHandler with OutHandler {

override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)

if (currentState.isEmpty) currentState = Some(nextState)
if (currentState.isEmpty) currentState = Some(nextState)

(currentState, maxElements) match {
case (Some(`nextState`), None) => stash(nextElement)
case (Some(`nextState`), Some(max)) if nbElements < max => stash(nextElement)
case _ => pushResults(Some(nextElement), Some(nextState))
(currentState, maxElements) match {
case (Some(`nextState`), None) => stash(nextElement)
case (Some(`nextState`), Some(max)) if nbElements < max => stash(nextElement)
case _ => pushResults(Some(nextElement), Some(nextState))
}
}
}

override def onPull(): Unit = {
if (!hasBeenPulled(in)) {
pull(in)
override def onPull(): Unit =
if (!hasBeenPulled(in)) {
pull(in)
}

override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
}
completeStage()
}
}

override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
private def stash(nextElement: Element) = {
buffer += nextElement
nbElements += 1
pull(in)
}
completeStage()
}

private def stash(nextElement: Element) = {
buffer += nextElement
nbElements += 1
pull(in)
}
})
)

override def preStart(): Unit = {
super.preStart()
maxDuration match {
case Some(max) => schedulePeriodically(None, max)
case None => Unit
case None => Unit
}
}
override def postStop(): Unit = {
override def postStop(): Unit =
buffer.clear()
}

override protected def onTimer(timerKey: Any): Unit = {
override protected def onTimer(timerKey: Any): Unit =
pushResults(None, None)
}

private def pushResults(
nextElement: Option[Element],
nextState: Option[Property]): Unit = {
private def pushResults(nextElement: Option[Element], nextState: Option[Property]): Unit = {
if (!isAvailable(out)) { return }

val result = buffer.result()
Expand Down
54 changes: 28 additions & 26 deletions contrib/src/main/scala/akka/stream/contrib/Coroner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
package akka.stream.contrib

import java.io.PrintStream
import java.lang.management.{ ManagementFactory, ThreadInfo }
import java.lang.management.{ManagementFactory, ThreadInfo}
import java.util.Date
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import scala.concurrent.{ Promise, Awaitable, CanAwait, Await }
import java.util.concurrent.{CountDownLatch, TimeoutException}
import scala.concurrent.{Await, Awaitable, CanAwait, Promise}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.testkit.{ TestKit => AkkaTestKit, TestDuration }
import akka.testkit.{TestKit => AkkaTestKit, TestDuration}

/**
* The Coroner can be used to print a diagnostic report of the JVM state,
Expand All @@ -35,21 +35,20 @@ object Coroner { // FIXME: remove once going back to project dependencies
* The result of this Awaitable will be `true` if it has been canceled.
*/
trait WatchHandle extends Awaitable[Boolean] {

/**
* Will try to ensure that the Coroner has finished reporting.
*/
def cancel(): Unit
}

private class WatchHandleImpl(startAndStopDuration: FiniteDuration)
extends WatchHandle {
private class WatchHandleImpl(startAndStopDuration: FiniteDuration) extends WatchHandle {
val cancelPromise = Promise[Boolean]
val startedLatch = new CountDownLatch(1)
val finishedLatch = new CountDownLatch(1)

def waitForStart(): Unit = {
def waitForStart(): Unit =
startedLatch.await(startAndStopDuration.length, startAndStopDuration.unit)
}

def started(): Unit = startedLatch.countDown()

Expand Down Expand Up @@ -81,9 +80,11 @@ object Coroner { // FIXME: remove once going back to project dependencies
* If displayThreadCounts is set to true, then the Coroner will print thread counts during start
* and stop.
*/
def watch(duration: FiniteDuration, reportTitle: String, out: PrintStream,
def watch(duration: FiniteDuration,
reportTitle: String,
out: PrintStream,
startAndStopDuration: FiniteDuration = defaultStartAndStopDuration,
displayThreadCounts: Boolean = false): WatchHandle = {
displayThreadCounts: Boolean = false): WatchHandle = {

val watchedHandle = new WatchHandleImpl(startAndStopDuration)

Expand All @@ -99,7 +100,8 @@ object Coroner { // FIXME: remove once going back to project dependencies
if (!Await.result(watchedHandle, duration)) {
watchedHandle.expired()
out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...")
try printReport(reportTitle, out) catch {
try printReport(reportTitle, out)
catch {
case NonFatal(ex) {
out.println("Error displaying Coroner's Report")
ex.printStackTrace(out)
Expand All @@ -109,7 +111,9 @@ object Coroner { // FIXME: remove once going back to project dependencies
} finally {
if (displayThreadCounts) {
val endThreads = threadMx.getThreadCount
out.println(s"Coroner Thread Count started at $startThreads, ended at $endThreads, peaked at ${threadMx.getPeakThreadCount} in $reportTitle")
out.println(
s"Coroner Thread Count started at $startThreads, ended at $endThreads, peaked at ${threadMx.getPeakThreadCount} in $reportTitle"
)
}
out.flush()
watchedHandle.finished()
Expand Down Expand Up @@ -140,11 +144,8 @@ object Coroner { // FIXME: remove once going back to project dependencies
#Heap usage: ${memMx.getHeapMemoryUsage()}
#Non-heap usage: ${memMx.getNonHeapMemoryUsage()}""".stripMargin('#'))

def dumpAllThreads: Seq[ThreadInfo] = {
threadMx.dumpAllThreads(
threadMx.isObjectMonitorUsageSupported,
threadMx.isSynchronizerUsageSupported)
}
def dumpAllThreads: Seq[ThreadInfo] =
threadMx.dumpAllThreads(threadMx.isObjectMonitorUsageSupported, threadMx.isSynchronizerUsageSupported)

def findDeadlockedThreads: (Seq[ThreadInfo], String) = {
val (ids, desc) = if (threadMx.isSynchronizerUsageSupported()) {
Expand All @@ -160,13 +161,12 @@ object Coroner { // FIXME: remove once going back to project dependencies
}
}

def printThreadInfos(threadInfos: Seq[ThreadInfo]) = {
def printThreadInfos(threadInfos: Seq[ThreadInfo]) =
if (threadInfos.isEmpty) {
println("None")
} else {
for (ti threadInfos.sortBy(_.getThreadName)) { println(threadInfoToString(ti)) }
}
}

def threadInfoToString(ti: ThreadInfo): String = {
val sb = new java.lang.StringBuilder
Expand Down Expand Up @@ -211,10 +211,10 @@ object Coroner { // FIXME: remove once going back to project dependencies
if (i == 0 && ti.getLockInfo != null) {
import java.lang.Thread.State._
ti.getThreadState match {
case BLOCKED appendMsg("\t- blocked on ", ti.getLockInfo)
case WAITING appendMsg("\t- waiting on ", ti.getLockInfo)
case BLOCKED appendMsg("\t- blocked on ", ti.getLockInfo)
case WAITING appendMsg("\t- waiting on ", ti.getLockInfo)
case TIMED_WAITING appendMsg("\t- waiting on ", ti.getLockInfo)
case _
case _
}
}

Expand Down Expand Up @@ -255,10 +255,12 @@ trait WatchedByCoroner {

@volatile private var coronerWatch: Coroner.WatchHandle = _

final def startCoroner(): Unit = {
coronerWatch = Coroner.watch(expectedTestDuration.dilated, getClass.getName, System.err,
startAndStopDuration.dilated, displayThreadCounts)
}
final def startCoroner(): Unit =
coronerWatch = Coroner.watch(expectedTestDuration.dilated,
getClass.getName,
System.err,
startAndStopDuration.dilated,
displayThreadCounts)

final def stopCoroner(): Unit = {
coronerWatch.cancel()
Expand Down
Loading

0 comments on commit d002791

Please sign in to comment.