-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add overridden duration timeout to StreamTestKit #1468
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
# StreamTestKit.assertNoChildren is an internal API and can be changed | ||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.testkit.scaladsl.StreamTestKit.assertNoChildren") | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,12 +35,30 @@ object StreamTestKit { | |
* This assertion is useful to check that all of the stages have | ||
* terminated successfully. | ||
*/ | ||
def assertAllStagesStopped[T](block: => T, overrideTimeout: FiniteDuration)(implicit materializer: Materializer): T = | ||
materializer match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is code duplication here, I can clean it up if the premise of this PR is acceptable. |
||
case impl: PhasedFusingActorMaterializer => | ||
stopAllChildren(impl.system, impl.supervisor) | ||
val result = block | ||
assertNoChildren(impl.system, impl.supervisor, Some(overrideTimeout)) | ||
result | ||
case _ => block | ||
} | ||
|
||
/** | ||
* Asserts that after the given code block is ran, no stages are left over | ||
* that were created by the given materializer with an overridden duration | ||
* that ignores `stream.testkit.all-stages-stopped-timeout`. | ||
* | ||
* This assertion is useful to check that all of the stages have | ||
* terminated successfully. | ||
*/ | ||
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T = | ||
materializer match { | ||
case impl: PhasedFusingActorMaterializer => | ||
stopAllChildren(impl.system, impl.supervisor) | ||
val result = block | ||
assertNoChildren(impl.system, impl.supervisor) | ||
assertNoChildren(impl.system, impl.supervisor, None) | ||
result | ||
case _ => block | ||
} | ||
|
@@ -53,10 +71,15 @@ object StreamTestKit { | |
} | ||
|
||
/** INTERNAL API */ | ||
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = { | ||
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add an overloaded method here to avoid the mima issue? |
||
overrideTimeout: Option[FiniteDuration]): Unit = { | ||
val probe = TestProbe()(sys) | ||
val c = sys.settings.config.getConfig("pekko.stream.testkit") | ||
val timeout = c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis | ||
|
||
val timeout = overrideTimeout.getOrElse { | ||
val c = sys.settings.config.getConfig("pekko.stream.testkit") | ||
c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis | ||
} | ||
|
||
probe.within(timeout) { | ||
try probe.awaitAssert { | ||
supervisor.tell(StreamSupervisor.GetChildren, probe.ref) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might look a bit un-orthodox by Pekko standards, but it is the idiomatic way to override a configuration when in ScalaTest (which is what The defaault behaviour is still the same (i.e. getting the value from |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.pekko.stream.testkit | ||
|
||
import org.apache.pekko.testkit.TestKitBase | ||
import org.scalatest.time.{ Millis, Span } | ||
|
||
import java.util.concurrent.TimeUnit | ||
|
||
trait StreamConfiguration extends TestKitBase { | ||
case class StreamConfig(allStagesStoppedTimeout: Span = Span({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The premise here is that by default we get the timeout value from Pekko typesafe config (as is expected when using Pekko) however if a user wants to override this then its done the ScalaTest idiomatic way (which is overriding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is possible to omit units and support more flexibility, but the code is slightly complex and for reference only.
I just saw this |
||
val c = system.settings.config.getConfig("pekko.stream.testkit") | ||
c.getDuration("all-stages-stopped-timeout", TimeUnit.MILLISECONDS) | ||
}, Millis)) | ||
|
||
private val defaultStreamConfig = StreamConfig() | ||
|
||
/** | ||
* The default `StreamConfig` which is derived from the Actor System's `pekko.stream.testkit.all-stages-stopped-timeout` | ||
* configuration value. If you want to provide a different StreamConfig for specific tests without having to re-specify | ||
* `pekko.stream.testkit.all-stages-stopped-timeout` then you can override this value. | ||
*/ | ||
implicit def streamConfig: StreamConfig = defaultStreamConfig | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When adding mima excludes, please add the reason why it is acceptable. In this case, the reason might be we don't promise binary compatiblity for -testkit artifacts (https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html), and that this method is
private[testkit]