diff --git a/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java b/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java index 1aff57e5c50..11ea76f2b81 100644 --- a/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java +++ b/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java @@ -13,10 +13,9 @@ package org.apache.pekko.actor; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertFalse; -import java.util.concurrent.CompletionStage; +import java.time.Duration; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.junit.Before; import org.junit.Rule; @@ -38,9 +37,7 @@ public void beforeEach() { @Test public void testGetWhenTerminated() throws Exception { - system.terminate(); - final CompletionStage cs = system.getWhenTerminated(); - cs.toCompletableFuture().get(2, SECONDS); + system.terminateAndAwait(Duration.ofSeconds(2)); } @Test diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala index 0e9f7e29f76..0da467da6ff 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala @@ -250,19 +250,17 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen val system = ActorSystem().asInstanceOf[ActorSystemImpl] val wt = system.whenTerminated wt.isCompleted should ===(false) - val f = system.terminate() + system.terminate() val terminated = Await.result(wt, 10.seconds) system.whenTerminated.isCompleted should ===(true) terminated.actor should ===(system.provider.rootGuardian) terminated.addressTerminated should ===(true) terminated.existenceConfirmed should ===(true) - (terminated should be).theSameInstanceAs(Await.result(f, 10.seconds)) } "throw RejectedExecutionException when shutdown" in { val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf) - Await.ready(system2.terminate(), 10.seconds) - + system2.terminateAndAwait(10.seconds) intercept[RejectedExecutionException] { system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") } }.getMessage should ===("ActorSystem already terminated.") @@ -280,7 +278,7 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen case _: RejectedExecutionException => count.incrementAndGet() } - Await.ready(system2.whenTerminated, 10.seconds) + system2.terminateAndAwait(10.seconds) count.get() should ===(1) } diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala index d2d75544212..39c3ebbe50d 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala @@ -546,7 +546,7 @@ class CoordinatedShutdownSpec "be run by ActorSystem.terminate" in { val sys = ActorSystem(system.name, system.settings.config) try { - Await.result(sys.terminate(), 10.seconds) + sys.terminateAndAwait(10.seconds) sys.whenTerminated.isCompleted should ===(true) CoordinatedShutdown(sys).shutdownReason() should ===(Some(CoordinatedShutdown.ActorSystemTerminateReason)) } finally { @@ -561,7 +561,7 @@ class CoordinatedShutdownSpec .parseString("pekko.coordinated-shutdown.run-by-actor-system-terminate = off") .withFallback(system.settings.config)) try { - Await.result(sys.terminate(), 10.seconds) + sys.terminateAndAwait(10.seconds) sys.whenTerminated.isCompleted should ===(true) CoordinatedShutdown(sys).shutdownReason() should ===(None) } finally { diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/DynamicAccessSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/DynamicAccessSpec.scala index 2f1bf94099d..cda59cb70c4 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/DynamicAccessSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/DynamicAccessSpec.scala @@ -14,7 +14,6 @@ package org.apache.pekko.actor import scala.collection.immutable -import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } @@ -88,7 +87,7 @@ class DynamicAccessSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll } override def afterAll() = { - Await.result(system.terminate(), 10.seconds) + system.terminateAndAwait(10.seconds) super.afterAll() } } diff --git a/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala index 465f4041e88..d2bb80cfa38 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala @@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with Matchers { val system = ActorSystem("DispatcherShutdownSpec") threadCount should be > 0 - Await.ready(system.terminate(), 1.second) + system.terminateAndAwait(1.seconds) Await.ready(Future(pekko.Done)(system.dispatcher), 1.second) TestKit.awaitCond(threadCount == 0, 3.second) diff --git a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/GracefulStopDocTest.java b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/GracefulStopDocTest.java index 2352faa2ce4..de47aae14a8 100644 --- a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/GracefulStopDocTest.java +++ b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/GracefulStopDocTest.java @@ -15,6 +15,7 @@ // #imports +import java.time.Duration; import java.util.concurrent.TimeUnit; import org.apache.pekko.actor.typed.ActorSystem; @@ -100,7 +101,7 @@ public static void main(String[] args) throws Exception { system.tell(MasterControlProgram.GracefulShutdown.INSTANCE); - system.getWhenTerminated().toCompletableFuture().get(3, TimeUnit.SECONDS); + system.terminateAndAwait(Duration.ofSeconds(3)); } // #worker-actor diff --git a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/InteractionPatternsTest.java b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/InteractionPatternsTest.java index eb4c63d80a2..68097fe0dd5 100644 --- a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/InteractionPatternsTest.java +++ b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/InteractionPatternsTest.java @@ -21,7 +21,6 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; import org.apache.pekko.Done; import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing; import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource; @@ -897,8 +896,7 @@ public void fireAndForgetSample() throws Exception { ref.tell(new Printer.PrintMe("message 2")); // #fire-and-forget-doit - system.terminate(); - system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + system.terminateAndAwait(Duration.ofSeconds(5)); } @Test diff --git a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java index 84608858816..c33317eda25 100644 --- a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java +++ b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java @@ -13,11 +13,9 @@ package org.apache.pekko.actor.typed; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertFalse; -import java.util.concurrent.CompletionStage; -import org.apache.pekko.Done; +import java.time.Duration; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; @@ -29,8 +27,7 @@ public void testGetWhenTerminated() throws Exception { final ActorSystem system = ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedSystem"); system.terminate(); - final CompletionStage cs = system.getWhenTerminated(); - cs.toCompletableFuture().get(2, SECONDS); + system.terminateAndAwait(Duration.ofSeconds(2)); } @Test diff --git a/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/GracefulStopDocSpec.scala b/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/GracefulStopDocSpec.scala index 3a35285866b..ab786924c07 100644 --- a/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/GracefulStopDocSpec.scala +++ b/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/GracefulStopDocSpec.scala @@ -24,7 +24,6 @@ import pekko.actor.typed.{ ActorSystem, PostStop } import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.typed.ActorRef import scala.concurrent.duration._ -import scala.concurrent.Await import org.scalatest.wordspec.AnyWordSpecLike import pekko.actor.typed.Terminated @@ -157,7 +156,7 @@ class GracefulStopDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike // brutally stop the system system.terminate() - Await.result(system.whenTerminated, 3.seconds) + system.terminateAndAwait(3.seconds) // #start-workers } @@ -176,7 +175,7 @@ class GracefulStopDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike Thread.sleep(100) - Await.result(system.whenTerminated, 3.seconds) + system.terminateAndAwait(3.seconds) } } } diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ExtensionsSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ExtensionsSpec.scala index 17bfa5d4ce2..7dedfc0b405 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ExtensionsSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ExtensionsSpec.scala @@ -215,7 +215,8 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with before shouldEqual beforeCreation + 1 after shouldEqual before } finally { - classicSystem.terminate().futureValue + classicSystem.terminate() + classicSystem.whenTerminated.futureValue } } @@ -229,7 +230,8 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with (ext1 should be).theSameInstanceAs(ext2) } finally { - classicSystem.terminate().futureValue + classicSystem.terminate() + classicSystem.whenTerminated.futureValue } } diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala index d60d91077ee..1a94ce1b592 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala @@ -16,9 +16,11 @@ package org.apache.pekko.actor.typed import java.util.concurrent.{ CompletionStage, ThreadFactory } import scala.concurrent.{ ExecutionContextExecutor, Future } +import scala.concurrent.duration.Duration import com.typesafe.config.{ Config, ConfigFactory } import org.slf4j.Logger + import org.apache.pekko import pekko.{ actor => classic, Done } import pekko.actor.{ Address, BootstrapSetup, ClassicActorSystemProvider } @@ -125,6 +127,35 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA */ def terminate(): Unit + /** + * Terminates this actor system and blocks the current thread until it is terminated, + * waiting at most the given duration. + * + * @param atMost the maximum duration to wait + * @throws java.util.concurrent.TimeoutException if the wait timed out + * @throws InterruptedException if the current thread is interrupted while waiting + */ + def terminateAndAwait(atMost: Duration): Unit = { + import scala.concurrent.Await + terminate() + Await.result(whenTerminated, atMost) + } + + /** + * Terminates this actor system and blocks the current thread until it is terminated, + * waiting at most the given duration. + * + * @param atMost the maximum duration to wait + * @throws java.util.concurrent.TimeoutException if the wait timed out + * @throws InterruptedException if the current thread is interrupted while waiting + */ + def terminateAndAwait(atMost: java.time.Duration): Unit = { + import scala.concurrent.Await + import pekko.util.JavaDurationConverters._ + terminate() + Await.result(whenTerminated, atMost.asScala) + } + /** * Scala API: Returns a Future which will be completed after the ActorSystem has been terminated. * The `ActorSystem` can be stopped with [[ActorSystem.terminate]] diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/actor-system-terminate.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/actor-system-terminate.excludes new file mode 100644 index 00000000000..1190dc45ba8 --- /dev/null +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/actor-system-terminate.excludes @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Change the the ActorSystem.terminate method signature +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.actor.ActorSystem.terminate") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.ActorSystem.terminate") diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala index 7a3d68e9a52..0cc07e4a98e 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala @@ -677,6 +677,32 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid */ def terminate(): Future[Terminated] + /** + * Terminates this actor system and blocks the current thread until it is terminated, + * waiting at most the given duration. + * + * @param atMost the maximum duration to wait + * @throws java.util.concurrent.TimeoutException if the wait timed out + * @throws InterruptedException if the current thread is interrupted while waiting + */ + def terminateAndAwait(atMost: Duration): Unit = { + import scala.concurrent.Await + Await.result(terminate(), atMost) + } + + /** + * Terminates this actor system and blocks the current thread until it is terminated, + * waiting at most the given duration. + * + * @param atMost the maximum duration to wait + * @throws java.util.concurrent.TimeoutException if the wait timed out + * @throws InterruptedException if the current thread is interrupted while waiting + */ + def terminateAndAwait(atMost: java.time.Duration): Unit = { + import JavaDurationConverters._ + terminateAndAwait(atMost.asScala) + } + /** * Returns a Future which will be completed after the ActorSystem has been terminated * and termination hooks have been executed. If you registered any callback with diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala index d6348d83b22..6d4e5580e1e 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import BenchmarkActors._ @@ -101,7 +100,7 @@ class ActorBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala index 05317241e97..cb644316111 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ @@ -48,7 +47,7 @@ class ActorCreationBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala index 16ec75c1fe0..7eb5f2d02cd 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit import scala.annotation.tailrec -import scala.concurrent.Await import scala.concurrent.duration._ import BenchmarkActors._ @@ -79,7 +78,7 @@ class ForkJoinActorBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } // @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala index 097f333b1c3..96a11f09889 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ @@ -42,7 +41,7 @@ class RouterPoolCreationBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala index 3cdba681937..7507e06dd28 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala @@ -54,7 +54,7 @@ class ScheduleBenchmark { @TearDown def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala index 683c6b90ede..7109cc88ba9 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -50,7 +49,7 @@ class StashCreationBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala index 80052e1e8f3..acbb0a47601 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.{ Config, ConfigFactory } @@ -66,7 +65,7 @@ class TellOnlyBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } var actor: ActorRef = _ diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala index 56049bc876d..afdb7b0ff45 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala @@ -101,7 +101,7 @@ class TypedActorBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala index f0b7854ee2d..0aae239dc4c 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala @@ -118,7 +118,7 @@ class TypedForkJoinActorBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala index 0651949efb0..0df93d83a91 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala @@ -238,7 +238,7 @@ class ReliableDeliveryBenchmark { @TearDown(Level.Trial) def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/cluster/ddata/ORSetSerializationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/cluster/ddata/ORSetSerializationBenchmark.scala index 9d8c898f6a2..a47212ae5fa 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/cluster/ddata/ORSetSerializationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/cluster/ddata/ORSetSerializationBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.cluster.ddata import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -72,8 +71,8 @@ class ORSetSerializationBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system1.terminate(), 5.seconds) - Await.result(system2.terminate(), 5.seconds) + system1.terminateAndAwait(5.seconds) + system2.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/dispatch/NodeQueueBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/dispatch/NodeQueueBenchmark.scala index b08ddbe34b5..f2d4ee44603 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/dispatch/NodeQueueBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/dispatch/NodeQueueBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.dispatch import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -61,7 +60,7 @@ mailbox { }).withDispatcher("dispatcher").withMailbox("mailbox"), "receiver") @TearDown - def teardown(): Unit = Await.result(sys.terminate(), 5.seconds) + def teardown(): Unit = sys.terminateAndAwait(5.seconds) @TearDown(Level.Invocation) def waitInBetween(): Unit = { diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala index 31ee2d4fc53..5ca40e80df3 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.commons.io.FileUtils @@ -72,7 +71,7 @@ class PersistentActorDeferBenchmark { @TearDown def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala index b69696fc050..c5f5a985395 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.commons.io.FileUtils @@ -70,7 +69,7 @@ class PersistentActorThroughputBenchmark { @TearDown def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala index 4a8860c5386..58d639630fd 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.commons.io.FileUtils @@ -73,7 +72,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark { @TearDown def shutdown(): Unit = { system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.terminateAndAwait(15.seconds) storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/CodecBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/CodecBenchmark.scala index a1cb3a85619..59941332224 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/CodecBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/CodecBenchmark.scala @@ -197,8 +197,8 @@ class CodecBenchmark { @TearDown(Level.Trial) def tearDownTrial(): Unit = { - Await.result(system.terminate(), 5.seconds) - Await.result(systemB.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) + systemB.terminateAndAwait(5.seconds) } @Setup(Level.Iteration) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/SendQueueBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/SendQueueBenchmark.scala index 9bf95b8350a..c36981fc9dc 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/SendQueueBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/SendQueueBenchmark.scala @@ -17,7 +17,6 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -52,7 +51,7 @@ class SendQueueBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializationBench.scala b/bench-jmh/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializationBench.scala index 939cbad12c3..bafaa37a18f 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializationBench.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializationBench.scala @@ -19,7 +19,6 @@ import java.time.LocalDateTime import java.util import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import scala.annotation.nowarn @@ -233,7 +232,7 @@ class JacksonSerializationBench { @TearDown(Level.Trial) def tearDownTrial(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } private var size = 0L diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/AskBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/AskBenchmark.scala index 0b007953238..17f0f4c6f5b 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/AskBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/AskBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -82,7 +81,7 @@ class AskBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/CollectBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/CollectBenchmark.scala index 4cc2113b64a..cf50c452db7 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/CollectBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/CollectBenchmark.scala @@ -55,7 +55,7 @@ class CollectBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } private val newCollect = Source diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/EmptySourceBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/EmptySourceBenchmark.scala index 4de17fbf734..541afd2177d 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/EmptySourceBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/EmptySourceBenchmark.scala @@ -32,7 +32,7 @@ class EmptySourceBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } val setup = Source.empty[String].toMat(Sink.ignore)(Keep.right) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapConcatBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapConcatBenchmark.scala index f8c0d49d5d6..86853e78366 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapConcatBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapConcatBenchmark.scala @@ -16,7 +16,7 @@ package org.apache.pekko.stream import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import scala.concurrent.{ Await, Future } +import scala.concurrent.Future import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -63,7 +63,7 @@ class FlatMapConcatBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapMergeBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapMergeBenchmark.scala index 074bad5a02b..2f61fe83514 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapMergeBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/FlatMapMergeBenchmark.scala @@ -59,7 +59,7 @@ class FlatMapMergeBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/FlowMapBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/FlowMapBenchmark.scala index 5ab9a14c9f8..c442489cb4a 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/FlowMapBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/FlowMapBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Success @@ -92,7 +91,7 @@ class FlowMapBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/FramingBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/FramingBenchmark.scala index 269b120dbec..0422ce026f3 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/FramingBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/FramingBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Random @@ -87,7 +86,7 @@ class FramingBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/FusedGraphsBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/FusedGraphsBenchmark.scala index 1b0dd9d0bd7..fd2d233d060 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/FusedGraphsBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/FusedGraphsBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -303,7 +302,7 @@ class FusedGraphsBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala index 17c30e5f948..f71825eecc3 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala @@ -63,7 +63,7 @@ class InvokeWithFeedbackBenchmark { def tearDown(): Unit = { sourceQueue.complete() // no way to observe sink completion from the outside - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/LazyFutureSourceBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/LazyFutureSourceBenchmark.scala index 390dfa3bca5..0295c2d9f75 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/LazyFutureSourceBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/LazyFutureSourceBenchmark.scala @@ -54,7 +54,7 @@ class LazyFutureSourceBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } private val newLazyFutureSource = Source.lazyFuture(() => Future.successful("")).toMat(Sink.ignore)(Keep.right) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/MapAsyncBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/MapAsyncBenchmark.scala index 88f59d606d6..c5c5021cc2a 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/MapAsyncBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/MapAsyncBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ @@ -70,7 +69,7 @@ class MapAsyncBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializationBenchmark.scala index c1fd4307104..fdf9a98dde8 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializationBenchmark.scala @@ -129,7 +129,7 @@ class MaterializationBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/PartitionHubBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/PartitionHubBenchmark.scala index 40d25b51cc4..fdf41c5e29e 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/PartitionHubBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/PartitionHubBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -70,7 +69,7 @@ class PartitionHubBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/SourceRefBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/SourceRefBenchmark.scala index 87d69bce58c..5d6529e9f55 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/SourceRefBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/SourceRefBenchmark.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Success @@ -62,7 +61,7 @@ class SourceRefBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala index 061c9759983..4c312e852d2 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala @@ -56,7 +56,7 @@ class ZipWithIndexBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } private val newZipWithIndex = Source.repeat(1) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/impl/OutputStreamSourceStageBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/impl/OutputStreamSourceStageBenchmark.scala index 4b06c731d79..4e85fda10dc 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/impl/OutputStreamSourceStageBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/impl/OutputStreamSourceStageBenchmark.scala @@ -58,7 +58,7 @@ class OutputStreamSourceStageBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), 5.seconds) + system.terminateAndAwait(5.seconds) } } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesBenchmark.scala index 3f558f044c7..1b0e791c47f 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesBenchmark.scala @@ -80,7 +80,7 @@ class FileSourcesBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), Duration.Inf) + system.terminateAndAwait(Duration.Inf) } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesScaleBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesScaleBenchmark.scala index e007ed22b49..d63e8e3e727 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesScaleBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/io/FileSourcesScaleBenchmark.scala @@ -79,7 +79,7 @@ class FileSourcesScaleBenchmark { @TearDown def shutdown(): Unit = { - Await.result(system.terminate(), Duration.Inf) + system.terminateAndAwait(Duration.Inf) } @Benchmark diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala index 2c20e1cf3c7..0be412320ac 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala @@ -22,7 +22,6 @@ import pekko.persistence.PersistentActor import pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe } import com.typesafe.config.{ Config, ConfigFactory } -import scala.concurrent.Await import scala.concurrent.duration._ /** @@ -183,7 +182,7 @@ class PersistentShardingMigrationSpec extends PekkoSpec(PersistentShardingMigrat extractShardId(rememberedEntitiesProbe.ref)) f(system, region, rememberedEntitiesProbe) } finally { - Await.ready(system.terminate(), 20.seconds) + system.terminateAndAwait(20.seconds) } } diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala index 45dbacf1511..1baf3ae5bfb 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala @@ -27,7 +27,6 @@ import pekko.testkit.ImplicitSender import pekko.testkit.TestProbe import com.typesafe.config.ConfigFactory -import scala.concurrent.Await import scala.concurrent.duration._ /** @@ -144,7 +143,7 @@ class RememberEntitiesShardIdExtractorChangeSpec val region = ClusterSharding(system).start(TypeName, Props(new PA()), extractEntityId, extractShardId) f(system, region) } finally { - Await.ready(system.terminate(), 20.seconds) + system.terminateAndAwait(20.seconds) } } diff --git a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala index 6af92d89a7e..98926e21d55 100644 --- a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala +++ b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala @@ -13,7 +13,6 @@ package org.apache.pekko.cluster.client -import scala.concurrent.Await import scala.concurrent.duration._ import scala.annotation.nowarn @@ -117,7 +116,7 @@ class ClusterClientStopSpec extends MultiNodeSpec(ClusterClientStopSpec) with ST runOn(first, second) { enterBarrier("was-in-contact") - Await.ready(system.terminate(), 10.seconds) + system.terminateAndAwait(10.seconds) } diff --git a/cluster-typed/src/test/java/org/apache/pekko/cluster/typed/ClusterApiTest.java b/cluster-typed/src/test/java/org/apache/pekko/cluster/typed/ClusterApiTest.java index 47cb24450f4..094b6bd6e75 100644 --- a/cluster-typed/src/test/java/org/apache/pekko/cluster/typed/ClusterApiTest.java +++ b/cluster-typed/src/test/java/org/apache/pekko/cluster/typed/ClusterApiTest.java @@ -15,7 +15,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.cluster.ClusterEvent; @@ -61,10 +61,8 @@ public void joinLeaveAndObserve() throws Exception { probe2.expectMessageClass(SelfRemoved.class); } finally { - system1.terminate(); - system1.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); - system2.terminate(); - system2.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + system1.terminateAndAwait(Duration.ofSeconds(5)); + system2.terminateAndAwait(Duration.ofSeconds(5)); } } } diff --git a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/ClusterSingletonApiSpec.scala b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/ClusterSingletonApiSpec.scala index 11e60b17d0f..1db1b27a073 100644 --- a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/ClusterSingletonApiSpec.scala +++ b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/ClusterSingletonApiSpec.scala @@ -13,7 +13,6 @@ package org.apache.pekko.cluster.typed -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -121,7 +120,7 @@ class ClusterSingletonApiSpec override def afterAll(): Unit = { super.afterAll() - Await.result(system2.terminate(), 3.seconds) + system2.terminateAndAwait(3.seconds) } } diff --git a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 03f1ab39523..4b54a578fcd 100644 --- a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -15,7 +15,6 @@ package org.apache.pekko.cluster.typed.internal.receptionist import java.util.concurrent.ThreadLocalRandom -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -206,7 +205,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin if (down) { // abrupt termination system2.terminate() - Await.ready(system2.whenTerminated, 10.seconds) + system2.terminateAndAwait(10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) } else { clusterNode1.manager ! Leave(clusterNode2.selfMember.address) @@ -323,7 +322,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin // abrupt termination system2.terminate() - Await.ready(system2.whenTerminated, 10.seconds) + system2.terminateAndAwait(10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) regProbe1.awaitAssert({ @@ -369,7 +368,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) system2.terminate() - Await.ready(system2.whenTerminated, 10.seconds) + system2.terminateAndAwait(10.seconds) val testKit3 = ActorTestKit( system1.name, @@ -484,7 +483,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) system2.terminate() - Await.ready(system2.whenTerminated, 10.seconds) + system2.terminateAndAwait(10.seconds) val testKit3 = ActorTestKit( system1.name, diff --git a/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/QuickRestartSpec.scala b/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/QuickRestartSpec.scala index 0ee7e4bbb0e..fc057b5815b 100644 --- a/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/QuickRestartSpec.scala +++ b/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/QuickRestartSpec.scala @@ -116,7 +116,8 @@ abstract class QuickRestartSpec extends MultiNodeClusterSpec(QuickRestartMultiJv enterBarrier("before-terminate-" + n) runOn(second) { - restartingSystem.terminate().await + restartingSystem.terminate() + restartingSystem.whenTerminated.await } // don't wait for it to be removed, new incarnation will join in next round enterBarrier("terminated-" + n) diff --git a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala index 1bde6f2d8d6..fb505193278 100644 --- a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala +++ b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala @@ -13,7 +13,6 @@ package org.apache.pekko.cluster.ddata -import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -289,7 +288,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) expectTerminated(r) } } finally { - Await.ready(sys1.terminate(), 10.seconds) + sys1.terminateAndAwait(10.seconds) } val sys2 = ActorSystem( @@ -320,7 +319,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) expectMsgType[GetSuccess[GCounter]].dataValue.value.toInt should be(2) } } finally { - Await.ready(sys1.terminate(), 10.seconds) + sys1.terminateAndAwait(10.seconds) } } diff --git a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala index 36edb54f961..f0018cb5fd2 100644 --- a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala +++ b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala @@ -13,7 +13,6 @@ package org.apache.pekko.cluster.ddata -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -149,7 +148,7 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN } enterBarrier("removed") runOn(first) { - Await.ready(sys2.terminate(), 5.seconds) + sys2.terminateAndAwait(5.seconds) } within(15.seconds) { diff --git a/docs/src/test/java/jdocs/cluster/FactorialFrontendMain.java b/docs/src/test/java/jdocs/cluster/FactorialFrontendMain.java index 19298faa752..27dd4cf9927 100644 --- a/docs/src/test/java/jdocs/cluster/FactorialFrontendMain.java +++ b/docs/src/test/java/jdocs/cluster/FactorialFrontendMain.java @@ -15,7 +15,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; import org.apache.pekko.cluster.Cluster; @@ -70,7 +70,7 @@ public void run() { @Override public void run() { try { - system.getWhenTerminated().toCompletableFuture().get(10, TimeUnit.SECONDS); + system.terminateAndAwait(Duration.ofSeconds(10)); } catch (Exception e) { System.exit(-1); } diff --git a/docs/src/test/scala/docs/io/ReadBackPressure.scala b/docs/src/test/scala/docs/io/ReadBackPressure.scala index cab26fb9ac1..9bde9c4c14e 100644 --- a/docs/src/test/scala/docs/io/ReadBackPressure.scala +++ b/docs/src/test/scala/docs/io/ReadBackPressure.scala @@ -13,14 +13,13 @@ package docs.io -import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props } +import org.apache.pekko.actor.{ Actor, ActorRef, Props } import org.apache.pekko.io.Tcp._ import org.apache.pekko.io.{ IO, Tcp } import java.net.InetSocketAddress import org.apache.pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe } import org.apache.pekko.util.ByteString -import scala.concurrent.Await import scala.concurrent.duration.Duration object PullReadingExample { @@ -90,6 +89,6 @@ class PullReadingSpec extends PekkoSpec with ImplicitSender { client.send(connection, ResumeReading) client.expectMsg(Received(ByteString("hello"))) - Await.ready(system.terminate(), Duration.Inf) + system.terminateAndAwait(Duration.Inf) } } diff --git a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala index d4f3a3913a4..12012a8c20e 100644 --- a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala +++ b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala @@ -15,7 +15,6 @@ package org.apache.pekko.persistence.query import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.{ Config, ConfigFactory } @@ -101,7 +100,7 @@ class PersistenceQuerySpec extends AnyWordSpecLike with Matchers with BeforeAndA val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config) try block(sys) - finally Await.ready(sys.terminate(), 10.seconds) + finally sys.terminateAndAwait(10.seconds) } } diff --git a/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala b/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala index bbadb24f9d9..738f3d81ffd 100644 --- a/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala +++ b/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala @@ -16,7 +16,6 @@ package org.apache.pekko.persistence.serialization import java.io.NotSerializableException import java.util.UUID -import scala.concurrent.Await import scala.concurrent.duration.Duration import com.typesafe.config._ @@ -348,7 +347,7 @@ class MessageSerializerRemotingSpec extends PekkoSpec(remote.withFallback(custom } override def afterTermination(): Unit = { - Await.ready(remoteSystem.terminate(), Duration.Inf) + remoteSystem.terminateAndAwait(Duration.Inf) } "A message serializer" must { diff --git a/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/PersistentFsmToTypedMigrationSpec.scala b/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/PersistentFsmToTypedMigrationSpec.scala index 3a6e8443796..3631086e732 100644 --- a/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/PersistentFsmToTypedMigrationSpec.scala +++ b/persistence-typed/src/test/scala/docs/org/apache/pekko/persistence/typed/PersistentFsmToTypedMigrationSpec.scala @@ -239,7 +239,8 @@ class PersistentFsmToTypedMigrationSpec extends AnyWordSpec with ScalaFutures wi fsmRef ! PoisonPill classicProbe.expectTerminated(fsmRef) } finally { - classicActorSystem.terminate().futureValue + classicActorSystem.terminate() + classicActorSystem.whenTerminated.futureValue } val typedTestKit = ActorTestKit("System", PersistentFsmToTypedMigrationSpec.config) @@ -277,7 +278,8 @@ class PersistentFsmToTypedMigrationSpec extends AnyWordSpec with ScalaFutures wi fsmRef.tell(GetCurrentCart, classicProbe.ref) classicProbe.expectMsg(NonEmptyShoppingCart(Seq(shirt))) } finally { - classicActorSystem.terminate().futureValue + classicActorSystem.terminate() + classicActorSystem.whenTerminated.futureValue } val typedTestKit = ActorTestKit("TypedSystem", PersistentFsmToTypedMigrationSpec.config) diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala index 0babafe0526..3529d81ab15 100644 --- a/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala +++ b/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala @@ -15,7 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.{ Config, ConfigFactory } @@ -184,7 +183,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before def withActorSystem[T](name: String, config: Config)(block: ActorSystem => T): T = { val system = ActorSystem(name, journalConfig.withFallback(config)) try block(system) - finally Await.ready(system.terminate(), 3.seconds) + finally system.terminateAndAwait(3.seconds) } "EventAdapters in end-to-end scenarios" must { diff --git a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteReDeploymentSpec.scala b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteReDeploymentSpec.scala index e08b6535dc7..9f113a392ac 100644 --- a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteReDeploymentSpec.scala +++ b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteReDeploymentSpec.scala @@ -224,7 +224,7 @@ abstract class RemoteReDeploymentMultiJvmSpec(multiNodeConfig: RemoteReDeploymen enterBarrier("stopping") runOn(second) { - Await.result(sys.terminate(), 10.seconds) + sys.terminateAndAwait(10.seconds) } } diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala index 58a6c222b7d..5ae2d46e1d3 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala @@ -15,7 +15,6 @@ package org.apache.pekko.remote.artery import java.net.InetAddress -import scala.concurrent.Await import scala.concurrent.duration.Duration import com.typesafe.config.ConfigFactory @@ -41,7 +40,7 @@ trait BindCanonicalAddressBehaviors { implicit val sys = ActorSystem("sys", config.withFallback(commonConfig)) getInternal() should contain(getExternal()) - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } "bind to a random port but remoting accepts from a specified port" in { @@ -60,13 +59,13 @@ trait BindCanonicalAddressBehaviors { if (getInternal().collect { case Address(_, _, _, Some(port)) => port }.toSeq.contains(address.getPort)) { val sys2 = ActorSystem("sys", config.withFallback(commonConfig)) val secondInternals = getInternal()(sys2) - Await.result(sys2.terminate(), Duration.Inf) + sys2.terminateAndAwait(Duration.Inf) secondInternals } else { getInternal() } internals should not contain address.toAkkaAddress("pekko") - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } "bind to a specified bind hostname and remoting aspects from canonical hostname" in { @@ -106,7 +105,7 @@ trait BindCanonicalAddressBehaviors { getInternal().flatMap(_.port) should contain(getExternal().port.get) getInternal().map(x => (x.host.get should include).regex("0.0.0.0".r)) // regexp dot is intentional to match IPv4 and 6 addresses - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } } } diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala index 22d520e7e4b..83146e8df20 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala @@ -184,7 +184,7 @@ class SystemMessageDeliverySpec extends AbstractSystemMessageDeliverySpec(System watch(remoteRef) remoteRef ! "hello" expectMsg("hello") - Await.ready(systemC.terminate(), 10.seconds) + systemC.terminateAndAwait(10.seconds) system.log.debug("systemC terminated") // DeathWatchNotification is sent from systemC, failure detection takes longer than 3 seconds expectTerminated(remoteRef, 10.seconds) diff --git a/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala index 436994de582..f2d91292d4a 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala @@ -27,7 +27,6 @@ import org.scalatest.wordspec.AnyWordSpec import java.net.{ InetAddress, InetSocketAddress } import java.nio.channels.ServerSocketChannel -import scala.concurrent.Await import scala.concurrent.duration.Duration object NettyTransportSpec { @@ -69,7 +68,7 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior { getInternal() should contain(getExternal().withProtocol("tcp")) - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } "bind to a random port but remoting accepts from a specified port" in { @@ -89,7 +88,7 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior { getExternal() should ===(address.toAkkaAddress("pekko.tcp")) getInternal() should not contain address.toAkkaAddress("tcp") - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } finally { openSS.close() } @@ -117,7 +116,7 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior { getExternal() should ===(address.toAkkaAddress("pekko.tcp")) getInternal() should contain(address.toAkkaAddress("tcp")) - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } "bind to all interfaces" in { @@ -132,7 +131,7 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior { getInternal().flatMap(_.port) should contain(getExternal().port.get) getInternal().map(x => (x.host.get should include).regex("0.0.0.0".r)) // regexp dot is intentional to match IPv4 and 6 addresses - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } "be able to specify byte buffer allocator" in { @@ -186,7 +185,7 @@ trait BindBehavior { getExternal() should ===(address.toAkkaAddress(s"pekko.tcp")) getInternal() should contain(address.toAkkaAddress("tcp")) - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } s"bind to specified tcp address" in { @@ -218,7 +217,7 @@ trait BindBehavior { getExternal() should ===(address.toAkkaAddress(s"pekko.tcp")) getInternal() should contain(bindAddress.toAkkaAddress("tcp")) - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) } } } diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/ActorSystemLifecycle.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/ActorSystemLifecycle.scala index 135bd376675..0f1314392b6 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/ActorSystemLifecycle.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/ActorSystemLifecycle.scala @@ -15,7 +15,6 @@ package org.apache.pekko.stream.tck import java.util.concurrent.TimeoutException -import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.Config @@ -50,7 +49,7 @@ trait ActorSystemLifecycle { @AfterClass def shutdownActorSystem(): Unit = { try { - Await.ready(system.terminate(), shutdownTimeout) + system.terminateAndAwait(shutdownTimeout) } catch { case _: TimeoutException => val msg = "Failed to stop [%s] within [%s] \n%s".format( diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/ActorMaterializerSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/ActorMaterializerSpec.scala index 2e90a5ef428..39b706405c6 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/ActorMaterializerSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/ActorMaterializerSpec.scala @@ -158,7 +158,7 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender { "report correctly if it has been shut down from the side" in { val sys = ActorSystem() val m = ActorMaterializer.create(sys) - Await.result(sys.terminate(), Duration.Inf) + sys.terminateAndAwait(Duration.Inf) m.isShutdown should ===(true) } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala index 673629f2bf4..895767529a7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala @@ -857,7 +857,8 @@ class TcpSpec extends StreamSpec(""" // and is possible to communicate with Source.single(ByteString(0)).via(Tcp().outgoingConnection(address)).runWith(Sink.ignore).futureValue - sys2.terminate().futureValue + sys2.terminate() + sys2.whenTerminated.futureValue val binding = bindingFuture.futureValue binding.unbind().futureValue