Skip to content

Latest commit

 

History

History
144 lines (93 loc) · 4.28 KB

1.10.md

File metadata and controls

144 lines (93 loc) · 4.28 KB

1.10 错误处理

在流具象化时可以定义各种异常处理策略, 用在处理流元素时. 错误处理策略受actor监督机制启发, 但是采用了流处理领域的语义.


警告: ZipWith, GraphStage链接, ActorPublisherSourceActorSubscriberSink 组件暂时还不具备监控策略


##1.10.1 监控策略

在应用代码中有三种处理异常的方法:

  • Stop- 重启策略 流以失败终结
  • Resume - 恢复策略 当前元素将被丢弃并且流继续
  • Restart - 重启策略 当前元素被丢弃并且再重启stage后流继续. 重启Stage意味着任何积累的状态将被清空. 一般来说通过重新创建stage实例来实现.

默认情况下对所有异常使用停止策略, 当异常被抛出时流以失败终结.

implicit val materializer = ActorMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// 除以0将会导致流失败并且这里将以一个`Failure(ArithmeticException)`作为完成结果

流的默认的监控策略可通过配置具象器(materializer)来设置:

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}

implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
//导致除零错误的元素将会被丢弃
//流将以`Success(228)`为值的`Future`作为完成结果

在这里可以看到, 所有的ArithmeticException异常都会恢复流处理, 导致除零的元素都会有效的被丢弃


注意: 需要注意到丢弃元素可能导致拥有循环(cycles)的图死锁, 在1.5.9图循环、活跃性以及死锁中有相关解释


也可以为单个flow的所有操作定义监控策略.

implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _ => Supervision.Stop
}
val flow = Flow[Int]
  .fliter(100 / _ < 50).map(elem => 100 / (5 - elem))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)

val result = source.runWith(Sink.fold(0)(_ + _))

//导致除零错误的元素将会被丢弃
//流将以`Success(150)`为值的`Future`作为完成结果

RestartResume类似, 仅是在发生错误时原有的状态会在重置时丢弃

implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Restart
  case _ => Supervision.Stop
}

val flow = Flow[Int]
  .scan(0) { (acc, elem) =>
    if (elem < 0) throw new IllegalArgumentException("negative not allowed")
    else acc + elem
  }
  .withAttributes(ActorAttributes.supervisionStrategy(decider))

val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.grouped(1000).runWith(Sink.head)

//负数元素会导致`scan`这步骤重启,
//即 重新从0开始
//流将以`Success(Vector(0, 1, 4, 0, 5, 12)`为值的`Future`作为完成结果

##1.10.2 mapAsync中的错误

流的监控也可以应用到mapAsync产生的future上.

假设我们使用一个外部服务来查找email地址并且我们会将无法找寻到的结果丢弃掉.

我们从推特作者流开始:

val authors: Source[Author, Unit] =
  tweets
    .filter(_.hashtags.contains(akka))
    .map(_.author)

假设我们可以使用以下方法查询他们的email地址:

def lookupEmail(handle: String): Future[String] =

如果email没有找到那么Future将以Failure完成.

将作者流转化成email地址流我们可以使用mapAsync来调用lookupEmail服务完成, 并且我们可以使用Supervision.resumingDecider来丢弃未知的email地址:

import ActorAttributes.supervisionStrategy
import Supervision.resumingDecider

val emailAddresses: Source[String, Unit] =
  authors.via(
    Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
    .withAttributes(supervisionStrategy(resumingDecider)))

如果我们不使用Resume,那么默认的停止策略将在第一个以Failure为结果的Future完成时以失败结束该流.