Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

package org.apache.pekko.actor;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;

import java.util.concurrent.CompletionStage;
import java.time.Duration;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -38,9 +37,7 @@ public void beforeEach() {

@Test
public void testGetWhenTerminated() throws Exception {
system.terminate();
final CompletionStage<Terminated> cs = system.getWhenTerminated();
cs.toCompletableFuture().get(2, SECONDS);
system.terminateAndAwait(Duration.ofSeconds(2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,17 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen
val system = ActorSystem().asInstanceOf[ActorSystemImpl]
val wt = system.whenTerminated
wt.isCompleted should ===(false)
val f = system.terminate()
system.terminate()
val terminated = Await.result(wt, 10.seconds)
system.whenTerminated.isCompleted should ===(true)
terminated.actor should ===(system.provider.rootGuardian)
terminated.addressTerminated should ===(true)
terminated.existenceConfirmed should ===(true)
(terminated should be).theSameInstanceAs(Await.result(f, 10.seconds))
}

"throw RejectedExecutionException when shutdown" in {
val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf)
Await.ready(system2.terminate(), 10.seconds)

system2.terminateAndAwait(10.seconds)
intercept[RejectedExecutionException] {
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }
}.getMessage should ===("ActorSystem already terminated.")
Expand All @@ -280,7 +278,7 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen
case _: RejectedExecutionException => count.incrementAndGet()
}

Await.ready(system2.whenTerminated, 10.seconds)
system2.terminateAndAwait(10.seconds)
count.get() should ===(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ class CoordinatedShutdownSpec
"be run by ActorSystem.terminate" in {
val sys = ActorSystem(system.name, system.settings.config)
try {
Await.result(sys.terminate(), 10.seconds)
sys.terminateAndAwait(10.seconds)
sys.whenTerminated.isCompleted should ===(true)
CoordinatedShutdown(sys).shutdownReason() should ===(Some(CoordinatedShutdown.ActorSystemTerminateReason))
} finally {
Expand All @@ -561,7 +561,7 @@ class CoordinatedShutdownSpec
.parseString("pekko.coordinated-shutdown.run-by-actor-system-terminate = off")
.withFallback(system.settings.config))
try {
Await.result(sys.terminate(), 10.seconds)
sys.terminateAndAwait(10.seconds)
sys.whenTerminated.isCompleted should ===(true)
CoordinatedShutdown(sys).shutdownReason() should ===(None)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.apache.pekko.actor

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }

Expand Down Expand Up @@ -88,7 +87,7 @@ class DynamicAccessSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll
}

override def afterAll() = {
Await.result(system.terminate(), 10.seconds)
system.terminateAndAwait(10.seconds)
super.afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with Matchers {
val system = ActorSystem("DispatcherShutdownSpec")
threadCount should be > 0

Await.ready(system.terminate(), 1.second)
system.terminateAndAwait(1.seconds)
Await.ready(Future(pekko.Done)(system.dispatcher), 1.second)

TestKit.awaitCond(threadCount == 0, 3.second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

// #imports

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.apache.pekko.actor.typed.ActorSystem;
Expand Down Expand Up @@ -100,7 +101,7 @@ public static void main(String[] args) throws Exception {

system.tell(MasterControlProgram.GracefulShutdown.INSTANCE);

system.getWhenTerminated().toCompletableFuture().get(3, TimeUnit.SECONDS);
system.terminateAndAwait(Duration.ofSeconds(3));
}

// #worker-actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.pekko.Done;
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
Expand Down Expand Up @@ -897,8 +896,7 @@ public void fireAndForgetSample() throws Exception {
ref.tell(new Printer.PrintMe("message 2"));
// #fire-and-forget-doit

system.terminate();
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
system.terminateAndAwait(Duration.ofSeconds(5));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@

package org.apache.pekko.actor.typed;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;

import java.util.concurrent.CompletionStage;
import org.apache.pekko.Done;
import java.time.Duration;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
Expand All @@ -29,8 +27,7 @@ public void testGetWhenTerminated() throws Exception {
final ActorSystem<Void> system =
ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedSystem");
system.terminate();
final CompletionStage<Done> cs = system.getWhenTerminated();
cs.toCompletableFuture().get(2, SECONDS);
system.terminateAndAwait(Duration.ofSeconds(2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import pekko.actor.typed.{ ActorSystem, PostStop }
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.typed.ActorRef
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.wordspec.AnyWordSpecLike
import pekko.actor.typed.Terminated

Expand Down Expand Up @@ -157,7 +156,7 @@ class GracefulStopDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike
// brutally stop the system
system.terminate()

Await.result(system.whenTerminated, 3.seconds)
system.terminateAndAwait(3.seconds)
// #start-workers
}

Expand All @@ -176,7 +175,7 @@ class GracefulStopDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike

Thread.sleep(100)

Await.result(system.whenTerminated, 3.seconds)
system.terminateAndAwait(3.seconds)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
before shouldEqual beforeCreation + 1
after shouldEqual before
} finally {
classicSystem.terminate().futureValue
classicSystem.terminate()
classicSystem.whenTerminated.futureValue
}
}

Expand All @@ -229,7 +230,8 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
(ext1 should be).theSameInstanceAs(ext2)

} finally {
classicSystem.terminate().futureValue
classicSystem.terminate()
classicSystem.whenTerminated.futureValue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package org.apache.pekko.actor.typed
import java.util.concurrent.{ CompletionStage, ThreadFactory }

import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.duration.Duration

import com.typesafe.config.{ Config, ConfigFactory }
import org.slf4j.Logger

import org.apache.pekko
import pekko.{ actor => classic, Done }
import pekko.actor.{ Address, BootstrapSetup, ClassicActorSystemProvider }
Expand Down Expand Up @@ -125,6 +127,35 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA
*/
def terminate(): Unit

/**
* Terminates this actor system and blocks the current thread until it is terminated,
* waiting at most the given duration.
*
* @param atMost the maximum duration to wait
* @throws java.util.concurrent.TimeoutException if the wait timed out
* @throws InterruptedException if the current thread is interrupted while waiting
*/
def terminateAndAwait(atMost: Duration): Unit = {
import scala.concurrent.Await
terminate()
Await.result(whenTerminated, atMost)
}

/**
* Terminates this actor system and blocks the current thread until it is terminated,
* waiting at most the given duration.
*
* @param atMost the maximum duration to wait
* @throws java.util.concurrent.TimeoutException if the wait timed out
* @throws InterruptedException if the current thread is interrupted while waiting
*/
def terminateAndAwait(atMost: java.time.Duration): Unit = {
import scala.concurrent.Await
import pekko.util.JavaDurationConverters._
terminate()
Await.result(whenTerminated, atMost.asScala)
}

/**
* Scala API: Returns a Future which will be completed after the ActorSystem has been terminated.
* The `ActorSystem` can be stopped with [[ActorSystem.terminate]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Change the the ActorSystem.terminate method signature
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.actor.ActorSystem.terminate")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.ActorSystem.terminate")
26 changes: 26 additions & 0 deletions actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,32 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid
*/
def terminate(): Future[Terminated]

/**
* Terminates this actor system and blocks the current thread until it is terminated,
* waiting at most the given duration.
*
* @param atMost the maximum duration to wait
* @throws java.util.concurrent.TimeoutException if the wait timed out
* @throws InterruptedException if the current thread is interrupted while waiting
*/
def terminateAndAwait(atMost: Duration): Unit = {
import scala.concurrent.Await
Await.result(terminate(), atMost)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two methods for both Java and Scala


/**
* Terminates this actor system and blocks the current thread until it is terminated,
* waiting at most the given duration.
*
* @param atMost the maximum duration to wait
* @throws java.util.concurrent.TimeoutException if the wait timed out
* @throws InterruptedException if the current thread is interrupted while waiting
*/
def terminateAndAwait(atMost: java.time.Duration): Unit = {
import JavaDurationConverters._
terminateAndAwait(atMost.asScala)
}

/**
* Returns a Future which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. If you registered any callback with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import BenchmarkActors._
Expand Down Expand Up @@ -101,7 +100,7 @@ class ActorBenchmark {
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
system.terminateAndAwait(15.seconds)
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._
Expand Down Expand Up @@ -48,7 +47,7 @@ class ActorCreationBenchmark {
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
system.terminateAndAwait(15.seconds)
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit

import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._

import BenchmarkActors._
Expand Down Expand Up @@ -79,7 +78,7 @@ class ForkJoinActorBenchmark {
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
system.terminateAndAwait(15.seconds)
}

// @Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._
Expand All @@ -42,7 +41,7 @@ class RouterPoolCreationBenchmark {
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
system.terminateAndAwait(15.seconds)
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ScheduleBenchmark {
@TearDown
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
system.terminateAndAwait(15.seconds)
}

def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -50,7 +49,7 @@ class StashCreationBenchmark {
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
system.terminateAndAwait(15.seconds)
}

@Benchmark
Expand Down
Loading
Loading