#1.8 定制流处理 (Custom stream processing)
虽然akka stream处理词汇非常丰富(例子参照Streams Cookbook
),因为在已存在的操作中某些功能的缺失,或者出于性能的考虑,有些时候也有必要定义新的转换阶段。在这个部分我们将展示如何创建自定义处理阶段以及各种图节点。
注意:自定义图阶段不应该成为你的首选,在定制GraphStage
时使用flows
以及图DSLgraph DSL
一般情况下更容易且更不容易出错。
##1.8.1 GraphStage的定制处理 (Custom processing with GraphStage)
GraphStage
可以被抽象为用于创建有着任意数量的输入和输出端口的任意图形处理阶段。它对应通过组合其他而创建新的流处理阶段的GraphDSL.create()
方法。而GraphStage
不同的是它本身不可以被分割的成更小的,也允许状态以一种安全的方式在内部保持。
作为第一个推动案例,我们将创建一个从数字1开始简单发射数字的新的源Source
,直到被取消。首先我们需要定义阶段的"接口",这在akka stream的术语中被叫做形状shape
(更详细的解释在Modularity, Composition and Hierarchy
部分)。它看起来像:
import akka.stream.SourceShape
import akka.stream.stage.GraphStage
class NumbersSource extends GraphStage[SourceShape[Int]] {
// Define the (sole) output port of this stage
val out: Outlet[Int] = Outlet("NumbersSource")
// Define the shape of this stage, which is SourceShape with the port we defined above
override val shape: SourceShape[Int] = SourceShape(out)
// This is where the actual (possibly stateful) logic will live
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}
正如你所看到的,GraphStage
自身制定一了这个阶段的端口和包含端口的图形。它还包含一个尚未实现的方法叫createLogic
。如果你还记得,在多物化中阶段将可复用,每次生成不同执行实体。在GraphStage
的实际运行中,逻辑作为那些被调用createLogic
方法由物化创建的GraphStageLogic
实例。换句话说,我们需要创建将发射我们想要的数字的合适逻辑。
为了在backpressured
流中从源Source
发射元素,首先需要来自下游的需求。为了接收必要的行为我们需要通过输出端口(Outlet
)注册OutHandler
的子集。这个接收器将会接收与端口生命周期有关的行为。在我们的例子中我们需要复写onPull()
方法,这表明我们可以自由的发生一个元素,另外还有一个onDownstreamFinish()
的回调,这在下游取消时被调用。由于该回调是停止阶段的默认行为,我们不需要复写它,在onPull
回调,我们将简单的发射另一个元素,最后他看起来像这样:
import akka.stream.SourceShape
import akka.stream.Graph
import akka.stream.stage.GraphStage
import akka.stream.stage.OutHandler
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// All state MUST be inside the GraphStageLogic,
// never inside the enclosing GraphStage.
// This state is safe to access and modify from all the
// callbacks that are provided by GraphStageLogic and the
// registered handlers.
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
上述GraphStage
的实例是Graph[SourceShape[Int],Unit]
的子集,这意味着在很多情况已经可以使用了,但是不像我们通常使用的Source s
中提供DSL方法。为了转化这个图到恰到的Source
,我们需要使用Source.fromGraph
包装它(更多关于图和DSL的细节请查阅Modularity, Composition and Hierarchy
)。现在我们能像内建的source一样使用它:
// A GraphStage is a proper Graph, just like what GraphDSL.create would return
val sourceGraph: Graph[SourceShape[Int], Unit] = new NumbersSource
// Create a Source from the Graph to access the DSL
val mySource: Source[Int, Unit] = Source.fromGraph(new NumbersSource)
// Returns 55
val result1: Future[Int] = mySource.take(10).runFold(0)(_ + _)
// The source is reusable. This returns 5050
val result2: Future[Int] = mySource.take(100).runFold(0)(_ + _)
###端口状态 , InHandler和OutHandler(Port states, InHandler and OutHandler)
为了与阶段的端口相互作用(入口或出口),我们需要能接收事件并生成属于该端口的新事件。以下来子GraphStageLogic
的操作可以在输出端口上:
- 推(输出,元素)
push(out,elem)
把一个元素推到输出端口。只有是端口由下游拉取pulled
之后发生。 complete(out)
正常关闭输出端口fail(out,exception)
用一个故障信号关闭端口。
关联到接收被注册到输出端口的OutHandler
实例的事件使用setHandler(out,handler)
。这个处理器有两个回调
onPull()
在输出端口准备好发射下一个元素,push(out,elem)
此时被允许在该端口上调用时被调用onDownstreamFinish()
在下游取消也不在允许消息被推送到它时被调用。在这个事件被调用后没有更多的onPull()
可达,如果不复写,将默认采用停止阶段。
另外,还为输出端口提供了两种查询方法:
isAvailable(out)
如果端口能被pushed
将返回true
isClosed(out)
如果端口关闭将返回true
。这种情况下端口不能再被pushed
也不会pulled
。 上述操作,事件以及查询的关系可以通过下面的状态机总结。绿色标出了初始状态,而橘色标出了结束状态。如果一个操作没有被一个状态列出,那么当端口处于该状态下调用它是无效的。如果事件没被在状态下列出,那么该状态下将不会发生这个事件。
下面操作在输出端口有效:
pull(in)
需要一个来自输出端口的元素。只有在端口被上游推送pushed
后发生。grab(in)
通过获取一个onPush()
接收到的元素。在端口被上游重新推送之前不能被再次调用cancel(in)
结束输入端口
关联到被注册的输入端口的InHandler
实例使用setHandler(in,handler)
。这个处理器有三个回调:
onPush()
在输出端口有一个新的元素时被调用。现在有可能通过在端口上使用grab(in)
或者调用pull(in)
来请求下一个元素获取元素。这不是强制去抢夺grab
元素,但是如果元素没有被抓住而已经拉取的情况下将丢弃缓冲的元素。onUpstreamFinish()
一旦上游完成而不在拉取新元素时候被调用。在这事件之后没有更多的onPush()
可达。如果不复写,将默认采用停止阶段。onUpstreamFailure()
在上游由异常造成失败且不再拉取新元素时被调用。在这个事件之后没有更多的onPush()
可达。如果没有复写,默认采用失败阶段。
另外,还为输入端口提供了三个查询方法:
isAvailable(in)
如果端口可以被抓住时返回true
hasBeenPulled(in)
如果端口已经被拉取时返回true
。在这个阶段调用pull(in)
是非法的。isClosed(in)
如果端口关闭将返回true
。这种情况下端口不能再被pushed
也不会pulled
。
上述操作、事件以及查询的关系可以通过下面的状态机总结。绿色标出了初始状态,而橘色标出了结束状态。如果一个操作没有被一个状态列出,那么当端口处于该状态下调用它是无效的。如果事件没被在状态下列出,那么该状态下将不会发生这个事件。 最后,为了方便完成阶段的端口,提供了两个方法:
completeStage()
相当于关闭所有的输出端口和取消所有的输入端口failStage(exception)
相当于使所有输出端口失败和取消所有的输入端口 在某些情况下用基于上述API回应常规状态机的事件的信号是不便的和易出错的。在那些情况下,有一个 允许动作的说明性排序declarative sequencing
的API将大大简化额外分配的开销的用例。两个API之间的区别能被描述为第一个API是外部信号驱动的,而这个API是更积极以及驱动周边的drives its surroundings.
。:class:GraphStage
这部分操作的API是:emit(out,elem)
和emitMultiple(out,Iterable(elem1,elem2))
通过一个当需要时,发射一个或者多个元素然更再次导入当下的处理器reinstalls the current ha更dlers
的处理器handler
来代替OutHandler
。read(in)(andThen)
和readN(in,n)(andThen)
通过当一个或多个元素更推送,然后允许处理器在一旦请求数目的元素被更出后做出反映的处理器handler
代替InHandler
abortEmitting()
和abortReading()
在不间断的发射和读取时的取消操作。 注意,由于上述方法是通更短暂替换阶段的处理器实现的,你不能在他们运行发射或者读取时调用setHandler
方法,否则会干扰到它们的实现。下面是一种安全的方法在初始化发射和读取后调用(这将会导致在操作实际是运行时变成完成):complete(out),completeStage(),emit,emitMultiple,abortEmitting(),abortReading()
在第二版的:class:Duplicator
中键展示如何简化这个API的例子。
###通过GraphStage定制线性处理阶段 (Custom linear processing stages using GraphStage)
Graph
阶段通过让它们有一个输入和输出,以及用FlowShape
作为它们的形状而允许自定义线性处理阶段。这个阶段能被下面有着一个有着两个flows
的盒子box
的简图表示。
为了说明这些概念,我们创建一个小的实现了map
转化的GraphStage
。
Map从o更Push()
处理器调用push(out)
以及从o更Pull()
处理器调用pull()
结果是在概念上和上述一致,并用下面代码充分表达:
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
val in = Inlet[A]("Map.in")
val out = Outlet[B]("Map.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, f(grab(in)))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Map
是在需求通过上游元素传递下游时典型的一对一转化的例子。
我们将实现filter
表现出多对一阶段。联系过滤器的概念是类似的:
就像我们看到的那样,如果给定断言和目前的元素匹配,我们将向下游传递元素,否则返回"ball"到上游来获取一个新的元素。通过在更onPush处理器增加一个条件以及决定选择
pull(in)或
push(out)来修改
map`例子实现(当然没有映射函数f)
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
下一步为了完善简图我们定义了一对多转化。我们选择一个直接的例子,例子中每一个上游的元素被两次发送到下游。这个阶段的概念上的联系类似:
这是一个有状态的阶段:有没有重复最后一个元素可以被看见,我们还需要确保在上游完成时发送额外的元素。
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// Again: note that all mutable state
// MUST be inside the GraphStageLogic
var lastElem: Option[A] = None
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
lastElem = Some(elem)
push(out, elem)
}
override def onUpstreamFinish(): Unit = {
if (lastElem.isDefined) emit(out, lastElem.get)
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (lastElem.isDefined) {
push(out, lastElem.get)
lastElem = None
} else {
pull(in)
}
}
})
}
}
在这种情况下,来自下游的拉取可能会被自身消耗而不是通过包含将要发送的元素的上游阶段。需要注意的是我们需要处理当上游关闭而这个阶段还有元素想要推送到下游的情况。这可以通过在InHandler
复写onUpstreamFinish
以及提供当上游完成时需要执行的自定义逻辑来实现的。
这个例子可以通过调用emitMultiple
变更可变状态来代替处理器来简化,每次发射多个元素,然后恢复原来的处理程序:
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
// this will temporarily suspend this handler until the two elems
// are emitted and then reinstates it
emitMultiple(out, Iterable(elem, elem))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
最后,为了演示上面的所有阶段,我们将它们连成一个处理链路,这和下面结构在概念上是对应的:
在流中用自定义的阶段仅仅几行代码:
val resultFuture = Source(1 to 5)
.via(new Filter(_ % 2 == 0))
.via(new Duplicator())
.via(new Map(_ / 2))
.runWith(sink)
如果我们视图绘制事件的顺序,它展示阶段有一个"事件标记""event token"
在潜在的链路,概念上就和我们的铁轨railroad tracks
一样来表示预测。
###完成(Completion)
完成处理通常(但不完全)在上游完成后处理阶段需要发射更多的元素而进入图中。我们已经看到第一个复写实现的例子,即使上游临近阶段已经完成,最后一个元素仍然需要被(发送)两次。可以通过在InHandler
中复写onUpstreamFinish
方法来实现。默认情况下一旦内部或者外部的所有端口被关闭(输入和输出),阶段将自动停止。可能去从这种行为中通过调用setKeepGoing(true)
来挑选输出(在阶段操作中不被支持,而通常在预启动preStart
中进行)。在这种情况下,阶段必须通过调用completeStage()
或者failStage(exception)
来明确的结束。这个特性携带泄漏stream
和actor
的风险,因此需要谨慎使用。
###使用定时器 (Using timers)
通过使用TimerGraphStageLogic
作为返回逻辑的基类可以在GraphStage
中使用定时器。定时器可以通过调用scheduleOnce(key,delay), schedulePeriodically(key,period)
或者schedulePeriodicallyWithInitialDelay(key,delay,period)
中的一个以及传递一个对象作为定时器的键值(可以使用任意键值,比如字符串)来调用定时器。onTimer(key)
方法需要被复写,而一旦定时器的键值被触发时将被调用。通过cancelTimer(key)
可以取消定时器,通过isTimerActive(key)
检测定时器的状态。在阶段完成时定时器被自动的清理。
定时器在构造逻辑中不能被调用,但是这个可以通过自身的钩子方法preStart()
调度它们。
在这个阶段简单的在打开和关闭之间切换,其中打开意味着不再元素通过,这个阶段开始的时候是关闭的,但是随着元素被推送到下游,持续一段时间后开关会变成打开,这将消耗和丢弃上游的消息。
// each time an event is pushed through it will trigger a period of silence
class TimedGate[A](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("TimedGate.in")
val out = Outlet[A]("TimedGate.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) {
var open = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (open) pull(in)
else {
push(out, elem)
open = true
scheduleOnce(None, silencePeriod)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = { pull(in) }
})
override protected def onTimer(timerKey: Any): Unit = {
open = false
}
}
}
###使用异步 side-channels (Using asynchronous side-channels)
为了接收流元素尚未到达的异步行为(你比如completion
或future
或第三方API的回调)必须通过从阶段逻辑调用getAsyncCallback()
来获取AsyncCallback
。getAsyncCallback
接收回调作为参数,一旦异步事件被触发,回调将被调用。很重要的一点是不要直接调用回调,相反,外部API必须在返回的AsyncCallback
上调用invoke(event)
方法。执行引擎会注意以线程安全的方式调用被提供的回调函数。回调可以在GraphStageLogic
实现中安全的访问状态。
从构造中共享AsyncCallback
是有风险的,因此,建议使用自身的钩子方法preStart()
代替。这个例子展示在future完成时开始废弃元素的异步side channel
图阶段:
// will close upstream when the future completes
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("KillSwitch.in")
val out = Outlet[A]("KillSwitch.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = {
val callback = getAsyncCallback[Unit] { (_) =>
completeStage()
}
switch.foreach(callback.invoke)
}
setHandler(in, new InHandler {
override def onPush(): Unit = { push(out, grab(in)) }
})
setHandler(out, new OutHandler {
override def onPull(): Unit = { pull(in) }
})
}
}
###与actor集成 (Integration with actors)
这是残缺的部分,在将来的版本中将扩展,这是实验性的功能*
从阶段外部发表的ActorRef
是可能被获取的,类似AsyncCallback
允许注入异步事件到阶段逻辑中。这个引用可被通过一个接受了发送的ActorRef
和接收的消息的对Pair
的函数调用 getStageActorRef(receive)
获取。这个引用可以通过调用watch(ref)
和unwatch(ref)
方法来监视watch
其他的actor
。这个引用也能被外部的actor
监视。当前限制ActorRef
的是:
- 它们不是位置透明的,不能通过路由访问
- 它们不能作为物化值被返回
- 它们不能通过
GraphStageLogic
的构造翻个温暖,不过可以通过preStart
访问。
###定制物化值 (Custom materialized values)
定制阶段能通过引入GraphStageWithMaterializedValue
而不是简单的GraphStage
返回物化值而不是Unit
。这种情况下这两者的差异是除了阶段逻辑必须提供物化值外createLogicAndMaterializedValue(inheritedAttributes)
方法需要被复写。
警告:不管是逻辑运行还是获取物化值的线程,都没有提供内建的该值的同步访问。添加必要((non-blocking)同步以及保证用户能看到这个共享对象是程序员的责任。
在这个实例中物化值是必须经过流的包含第一个元素的future
:
class FirstValue[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[A]] {
val in = Inlet[A]("FirstValue.in")
val out = Outlet[A]("FirstValue.out")
val shape = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[A]) = {
val promise = Promise[A]()
val logic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
promise.success(elem)
push(out, elem)
// replace handler with one just forwarding
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, grab(in))
}
})
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
(logic, promise.future)
}
}
###使用属性影响阶段的行为 (Using attributes to affect the behavior of a stage)
这是残缺的部分,在将来的版本中将扩展
阶段可以访问由物化值创建的属性对象,这包含了所有作用于阶段的应用(继承)属性,不具体(最外层)到最具体(最里层)属性的排序。这是阶段决定如何协调这个继承链路最终有效决断的责任。属性的如何工作的解释查看Modularity, Composition and Hierarchy
。
###图阶段去耦速率 (Rate decoupled graph stages)
有些时候期望解除上游和下游速率的耦合,而使上游一个阶段,下游一个阶段,仅仅在需要的时候同步。在模型中用有着两个区域之间边界的GraphStage
来实现的,其中上游的发送需求和下游的到达需求是解耦的。这种差异的一个之间后果是onPush
不再总导致push
调用而onPull
不在总导致pull
调用。
一个重要用例是建立buffer-like
实体,在缓冲没有满或者空的时候将允许上下游阶段独立处理,在缓冲变满或者空的时候减缓相应的一侧。
下面的图说明有着两个元素容量的缓冲被设置而下游请求是缓慢开始而缓冲将在任何来自下游请求被发现之前被上游的元素充满时的事件序列:
我们可以注意到的第一点区别是,缓冲阶段是在初始化过程中自动拉取上游的元素。缓冲在没有任何下游元素的情况下最多被请求两个元素。下面的代码演示了对应上述消息时序图的缓冲区类:
class TwoBuffer[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("TwoBuffer.in")
val out = Outlet[A]("TwoBuffer.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
val buffer = mutable.Queue[A]()
def bufferFull = buffer.size == 2
var downstreamWaiting = false
override def preStart(): Unit = {
// a detached stage needs to start upstream demand
// itself as it is not triggered by downstream demand
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer.enqueue(elem)
if (downstreamWaiting) {
downstreamWaiting = false
val bufferedElem = buffer.dequeue()
push(out, bufferedElem)
}
if (!bufferFull) {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) {
// emit the rest if possible
emitMultiple(out, buffer.toIterator)
}
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (buffer.isEmpty) {
downstreamWaiting = true
} else {
val elem = buffer.dequeue
push(out, elem)
}
if (!bufferFull && !hasBeenPulled(in)) {
pull(in)
}
}
})
}
}
##1.8.2线程安全的定制处理阶段 (Thread safety of custom processing stages) 以上所有的自定义阶段(线性或者图形)提供了一些实现者可以依赖的简单保证
callbacks
通过所有的类不会同时调用而公开- 通过这些类封装的状态可以通过提供的回调方法在没有任何其他同步的情况下安全的被修改。
在本质上说,如果认为自定义阶段的状态类似
actor
的状态而回调类似actor
的接收代码块receive block
,那么上述保证类似actor
提供的那样。
警告:任何定制阶的外部使用提供的回调访问状态并不是安全的,就类似从actor外部访问状态也不是安全的一样。这意味着Future
回调不应该在定制阶段内部关闭,因为这种访问通过提供的回调能造成并发,而导致不确定的行为。