Akka Stream
默认以并发的方式处理步骤(通常是在 Flow
和Source
或者Graph
链接 处简单的操作). 这是由在内部把各个处理步骤映射到一个专用的Actor
上实现的. 我们将通过烤煎饼的例子来展示Stream
如何利用现代计算机的并行性到各个处理过程中. 以下是设定: Patrik和Roland都喜欢制作煎饼, 但是他们需要在一次制作过程中制作足够量的煎饼, 来使得孩子高兴. 为了提高他们煎饼的单位时间产量, 他们使用了两个煎锅. 他们两者安排如何来烤过程步骤大相径庭.
Roland以不同的方式使用两个煎锅. 第一个煎锅只用来烤煎饼的一面, 然后半面完成的煎饼被倒置到第二个煎锅中来煎烤另一面来完成整个过程. 一旦第一个煎锅变得可用, 它就得到一勺面糊. 这种方式的效果, 大部分的时间两个煎锅同时工作, 一个饼在烤一面同时第二个饼烤另一面然后完成. 以下是这个过程用流来实现大概的样子:
// 获取一勺面糊接着创造出一个半面煎饼
val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] =
Flow[ScoopOfBatter].map { batter => HalfCookedPancake() }
// 完成一个半面煎饼
val fryingPan2: Flow[HalfCookedPancake, Pancake, Unit] =
Flow[HalfCookedPancake].map { halfCooked => Pancake() }
//通过两个半面煎饼我们就完成了烘烤煎饼
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] =
Flow[ScoopOfBatter].via(fryingPan1).via(fryingPan2)
在序列中的这两个map
步骤(封装在frying pan
的Flow
中)将以流水线方式执行, 基本和Roland用煎锅的方式相似:
- 一个
ScoopOfBatter
进入fryingPan1
- 一旦
fryingPan2
变得可用fryingPan1
输出一个HalfCookedPancake
fryingPan2
取走这个HalfCookedPancake
- 同时
fryingPan1
已经获取了下一勺,而不必等待fryingPan2
完成
使用流水线的优点是它可以应用到任何无法并行化的处理步骤序列上(举个例子, 因为一个处理步骤的结果需要前一个步骤处理的所有信息). 一个缺点是, 如果步骤的处理时间差异很大, 那么一些步骤不能在一个全通量下处理, 因为它们大部分时间需要等待前序的或者子步骤. 在烤煎饼例子中烤第二面的时间通常会比烤第一面快, 所以fryingPan2
无法全力处理煎饼.
流处理步骤有内部的缓存来保证它们之间的通讯更有效率. 关于这种行为的细节内容和如何添加额外的缓存, 请看Buffers and working with rate
Patrik以相同的方式使用煎锅. 他使用两个锅都完全烤完煎饼的两个面, 然后把结果放置到一个共享的盘子上. 每当一个煎锅空了, 他从一碗共享的面糊中拿出下一勺. 本质上他在多个煎锅上并行了同样的过程. 以下是这个过程用流来实现大概的样子:
val fryingPan: Flow[ScoopOfBatter, Pancake, Unit] =
Flow[ScoopOfBatter].map { batter => Pancake() }
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))
//并行的使用两个煎锅, 两个都从面糊完全烤完成煎饼
//我们总是把下一勺面糊放置到第一个变得空闲的煎锅上
dispatchBatter.out(0) ~> fryingPan ~> mergePancakes.in(0)
//注意我们在没有使用`builder.add()`引用的方式下, 使用`fryingPan`的`Flow`
//这种使用方式下的`Flow`是自动引入的, 在这个例子中意味着
//两个地方使用的`fryingPan`实际上在`graph`是两个不同的步骤
dispatchBatter.out(1) ~> fryingPan ~> mergePancakes.in(1)
FlowShape(dispatchBatter.in, mergePancakes.out)
并行的优点是它很容易扩展. 在烤煎饼例子中使用Patrik的方法很容易添加第三个煎锅, 但是Roland
的方法无法添加, 这是由于这需要多第三个步骤, 但是由于烤煎饼的例子中实践上是不可能的.
上述样例代码的一个缺点是它没有保留煎饼的顺序. 如果孩子们想要跟踪自己的煎饼那么这将是一个问题. 这种情况下Balance
和Merge
步骤需要用strict-round robing balancing
和merging
步骤来替换, 使得放入和拿出煎饼的过程在一个严格的次序下.
一个创建Work Pool
的更详细的例子可以在CookBook:Balancing jobs to a fixed pool
找到.
我们展示的两种为了提高通量的并发模式并不是排他的. 事实上把两者融合起来是相当简单的, Stream提供了一个优美统一的语言来表示和组合他们.
首先, 让我们来看一下我们如何把流水线处理步骤并行化. 在烤煎饼的例子中意味着我们将雇佣两个厨师, 每一个厨师使用Roland的流水线方法, 但是我们并行的使用两个厨师, 就想Patric使用煎锅那样. 以下是使用流来表示的样子:
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))
// 使用两条流水线, 每一条拥有两个煎锅, 合计使用
// 四个煎锅
dispatchBatter.out(0) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(0)
dispatchBatter.out(1) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(1)
FlowShape(dispatchBatter.in, mergePancakes.out)
})
如果有许多独立的不依赖各自结果的工作, 但是工作本身需要多个处理步骤并且每个步骤需要依赖前序步骤的情况下, 上面的例子将工作的非常好. 在我们的例子中, 独立的煎饼互相没有依赖关系, 它们可以并行的烘烤, 但是在同一时间内无法同时烘烤煎饼的两边, 所以煎饼的两边将依次序烘烤.
同样可以把并行的步骤组织成流水线. 这意味着将要雇佣4位厨师:
- 前两名厨师需要并行的准备从面糊到烤完半面的煎饼, 然后把它们放置到一个够大的平面上.
- 后两名厨师拿走半面煎饼并在各自的煎锅中烘烤另一边, 然后他们把煎饼放置到一个共享的盘子中.
再次很直接就可以用流API实现这个场景:
val pancakeChefs1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergeHalfPancakes = builder.add(Merge[HalfCookedPancake](2))
// 两个厨师各自用一个煎锅工作, 半烤煎饼, 然后放置
// 它们到一个公用池
dispatchBatter.out(0) ~> fryingPan1 ~> mergeHalfPancakes.in(0)
dispatchBatter.out(1) ~> fryingPan1 ~> mergeHalfPancakes.in(1)
FlowShape(dispatchBatter.in, mergeHalfPancakes.out)
})
val pancakeChefs2: Flow[HalfCookedPancake, Pancake, Unit] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchHalfPancakes = builder.add(Balance[HalfCookedPancake](2))
val mergePancakes = builder.add(Merge[Pancake](2))
// 两个厨师各自用一个煎锅工作, 完成煎饼烘烤, 然后放置
// 它们到一个公用池
dispatchHalfPancakes.out(0) ~> fryingPan2 ~> mergePancakes.in(0)
dispatchHalfPancakes.out(1) ~> fryingPan2 ~> mergePancakes.in(1)
FlowShape(dispatchHalfPancakes.in, mergePancakes.out)
})
val kitchen: Flow[ScoopOfBatter, Pancake, Unit] = pancakeChefs1.via(pancakeChefs2)
这个使用模式比较少见但是当某一个流水线步骤完成不同工作的处理时间上有明显差异时, 这个模式将有用. 原因在于相比较并行模式, 这种模式下有更加balance-merge
的步骤. 这个模式下每一个步骤后都进行了重新平衡, 而上一个模式只是在流水线的入口处进行了平衡. 这只有当处理时间的分布有一个比较大的方差时才有影响.