From bc4cbf5872f26256f7fd1f8e94c7670d8299e443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 2 Dec 2023 14:41:51 +0800 Subject: [PATCH] =str Fix maybe throw for `MinimalStage`. --- .../stream/javadsl/FlowUnfoldAsyncTest.java | 57 +++++++++++++++++++ .../org/apache/pekko/stream/impl/Unfold.scala | 2 +- 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 stream-tests/src/test/java-jdk9-only/org/apache/pekko/stream/javadsl/FlowUnfoldAsyncTest.java diff --git a/stream-tests/src/test/java-jdk9-only/org/apache/pekko/stream/javadsl/FlowUnfoldAsyncTest.java b/stream-tests/src/test/java-jdk9-only/org/apache/pekko/stream/javadsl/FlowUnfoldAsyncTest.java new file mode 100644 index 00000000000..88219f372bc --- /dev/null +++ b/stream-tests/src/test/java-jdk9-only/org/apache/pekko/stream/javadsl/FlowUnfoldAsyncTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.javadsl; + +import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.StreamTest; +import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; +import org.apache.pekko.testkit.PekkoSpec; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class FlowUnfoldAsyncTest extends StreamTest { + @ClassRule + public static PekkoJUnitActorSystemResource actorSystemResource = + new PekkoJUnitActorSystemResource("SourceTest", PekkoSpec.testConf()); + + public FlowUnfoldAsyncTest() { + super(actorSystemResource); + } + + @Test + public void testFoldAsync() throws Exception { + final Integer result = Source.unfoldAsync( + 0, + idx -> { + if (idx >= 10) { + return CompletableFuture.completedStage(Optional.empty()); + } else { + return CompletableFuture.completedStage(Optional.of(Pair.create(idx + 1, idx))); + } + }) + .runFold(0, Integer::sum, system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + Assert.assertEquals(45, result.intValue()); + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala index bd12267f8e7..c0f17189167 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala @@ -124,7 +124,7 @@ import scala.util.{ Failure, Success, Try } } def onPull(): Unit = { - f.apply(state) match { + f.apply(state).toCompletableFuture match { case cf: CompletableFuture[Optional[Pair[S, E]] @unchecked] if cf.isDone && !cf.isCompletedExceptionally => handle(cf.join()) case future =>