Skip to content

Latest commit

 

History

History
140 lines (126 loc) · 8.69 KB

1.7.md

File metadata and controls

140 lines (126 loc) · 8.69 KB

#1.7缓冲和处理速率 (Buffers and working with rate) akka streams处理阶段是异步的和默认管线的(这里指的是是类似流水线处理),这意味着,一个处理阶段将一个元素移交给下游消费者后能立即处理下一条消息。为了证明我们所说的,让我们看一下下面的例子:

Source(1 to 3)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)

运行上述例子,一种可能的输出是这样的:

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

注意到顺序并不是A:1, B:1, C:1, A:2, B:2, C:2,那是符合同步执行模型的,在一个元素完全的通过处理管线(被处理)之前下一个元素是不会进入的流的。下一个元素就和被发射的前一个元素一样被一个阶段处理。 而管线一般情况下增加吞吐量,在实践过程中通过异步边界(因此线程交叉thread crossing)传递元素的成本是显著的。为了分摊这个开销,akka stream在内部使用了一个窗口化、配套backpressure策略。这是由于窗口化不同于Stop-And-Wait协议,多个元素可能是"in-fight"的同时请求元素。这也是批处理因为新的元素没有在一个元素被从窗口缓存window-buffer排除后立即被请求(排出一个元素而立即请求另一个元素),而是多个元素在一批元素被排除后同时被请求。批处理策略降低了通过异步边界传播backpressure信号的成本。 虽然这种内部协议多数情况下对用户不可见(除了逐渐增大的吞吐量的影响),然而有些情况下,这些细节会暴露。在我们之前所有的例子中,我们总是假设 处理链路的速率通过backpressure信号导致每个处理阶段都不超过连接链路的吞吐量而形成严格的一致状态。然而akka stream中的工具允许一个链路中不同段的速率是"分离的"或者通过外部定时source定义流的最大吞吐量。这些情况与内部批处理缓冲策略突然变得不透明一致。

##1.7.1 akka streams中的缓存 (Buffers in Akka Streams) ###内部缓存和它们的影响 正如我们解释的那样,出于性能原因akka streams在每个处理阶段引入了一个缓存。这些缓冲器的目的纯粹是优化,实际上如果没有必要对吞吐量改进,那么(buffer)大小为1将是最自然的选择。因此,建议以小的尺寸保持这些缓冲区,并且仅仅增加到与应用的吞吐量适应的水平。默认的缓冲区大小可通过配置设置:

akka.stream.materializer.max-input-buffer-size = 16

另一种选择是通过ActorMaterializerSettingsmaterializer上设置:

val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withInputBuffer(
initialSize = 64,

如果缓冲区大小仅仅在一个Flow段内被设置,这可以通过定义有这个属性的单独的Flow实现:

val section = Flow[Int].map(_ * 2)
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
val flow = section.via(Flow[Int].map(_ / 2)) // the buffer size of this map is the default

下面代码演示了由内部缓冲引起的一些问题:

import scala.concurrent.duration._
case class Tick()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count))
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
.conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
zipper.out ~> Sink.foreach(println)
ClosedShape
})

运行上述例子,人们希望数字3每3秒打印一次(这里cUndefinedSourceonflate步骤被配置以至于在下游zipWith消费之前统计元素的接收数),但是被打印的并不是那样,我们将看到数字1.原因是内置缓冲的默认大小是16个元素,在zipWith消耗元素之前预先提取元素。通过改变zipWith的缓冲区大小为1(或者整个图形)可以解决这个问题。我们仍然会看到由ZipWith初始预读取元素而产生的第一个1leading 1(这里就是1会先出来)


注意:一般情况下,当时间或者速率驱动的处理阶段出现奇怪的行为,首要的尝试方案是将输入缓冲的大小减小到1


###明确定义的缓冲 (Explicit user defined buffers) 前一节介绍akka stream使用内部缓冲来降低通过异步边界交叉元素的成本。这些内部缓冲在未来的版本中很可能会自动的调整(大小)。在本节,我们将讨论用户明确定义的作为应用中流处理管线中领域逻辑部分的缓存。下面的示例将确保1000个作业(但不超过)(假设)从外部系统弹出存储在本地内存-缓解外部系统

// Getting a stream of jobs from an imaginary external system as a Source
val jobs: Source[Job, Unit] = inboundJobsConnector()
jobs.buffer(1000, OverflowStrategy.backpressure)

下面的例子也是1000个任务本地排队等待,但如果有更多的任务在虚拟外部系统中等待,它将通过丢弃缓冲区队尾元素来给新元素腾出空间。丢弃尾部是非常普遍的策略。但必须指出的是,这将会丢弃更新的等待任务。如果需要一些"公平",因为我们需要善待nice那些已经等了很久的任务,那么这个选项将是有用的。

jobs.buffer(1000, OverflowStrategy.dropTail)

取代从缓冲器的尾部丢弃最新的元素的方式是完全不把新的元素进入缓冲来丢弃新的元素

jobs.buffer(1000, OverflowStrategy.dropNew)

下面是另一个有关1000个任务的队列的例子,但是塔式通过删除缓冲区的头部来给新的元素腾出空间。这是最早的等待任务。如果任务在一定期限内没被处理而会重新发送,那么这是首选的策略。最早的元素将很快被重传(实际上重复发送的元素可能已经在队列中了),所以首先删除它是有意义的。

jobs.buffer(1000, OverflowStrategy.dropHead)

相比于以上的策略,一旦缓冲区满了,dropBuffer丢弃了所有进入缓冲的1000个任务。这个激进的策略在倾向于删除推迟任务时是有用的

jobs.buffer(1000, OverflowStrategy.dropBuffer)

如果我们想象外部任务发布者是一个使用我们API的客户端,我们可能会强制要求客户端不得有超过1000的排队任务,否则我们任务这是flooding(洪水攻击)然后终止连接。通过简单废弃一个满了的缓冲的流时的错误策略是很容易实现这一点。

jobs.buffer(1000, OverflowStrategy.fail)

##1.7.2 转化速率(Rate transformation) ###理解合并 (Understanding conflate) 如果(生产)速率很快的生产者无法通过backpressure或者其他信号来通知它减缓速率,conflate将在合并来自生产者的元素直到有来自消费者需求信号时是有用的。下面的例子片段汇总快速流生产的元素到一个非标准偏差standart deviation,统计已经计算的元素时平均和计算那些到达的元素

val statsFlow = Flow[Double]
.conflate(Seq(_))(_ :+ _)
.map { s =>
val 𝜇 = s.sum / s.size
val se = s.map(x => pow(x - 𝜇, 2))
val 𝜎 = sqrt(se.sum / se.size)
(𝜎, 𝜇, s.size)
}

这个示例证明,这样流的速录是解耦的。在flow开始时元素的速率可以比在flow结束时元素的速率高的多。 另一种可能使用conflate的情况是在生产者快速生产时不考虑选中所有的元素。下面的示例说明在消费者(的消费能力)跟不上生产者(的生产能力)时如何使用conflate来随机丢弃元素:

val p = 0.01
val sampleFlow = Flow[Double]
.conflate(Seq(_)) {
case (acc, elem) if Random.nextDouble < p => acc :+ elem
case (acc, _) => acc
}
.mapConcat(identity)

###理解展开(Understanding expand) 展开有助于处理那些无法跟上来自消费者需求的缓慢生产者。展开允许推断作为元素被发送到消费者的值。这里简单的使用展开expand,当生产者不发送任何新的元素时发射相同的元素到消费者。

val lastFlow = Flow[Double]
.expand(identity)(s => (s, s))

展开也允许保持一些来自下游请求的状态。这里跟踪和报告来自快速消费者和缓慢生产者之间偏移就是利用这一点:

val driftFlow = Flow[Double]
.expand((_, 0)) {
case (lastElement, drift) => ((lastElement, drift), (lastElement, drift + 1))
}

注意,所有来自上游的元素至少通过扩展一次。这意味着,如果生产者足够快,那么flow的输出将报告0偏移,否则是一个较大的偏移。