Skip to content

Commit 1da15f2

Browse files
committed
+ brought back Terminable feature
1 parent 31bef54 commit 1da15f2

File tree

7 files changed

+290
-87
lines changed

7 files changed

+290
-87
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<modelVersion>4.0.0</modelVersion>
2828
<groupId>io.github.q3769</groupId>
2929
<artifactId>conseq4j</artifactId>
30-
<version>20250317.0.0</version>
30+
<version>20250317.2025.0</version>
3131
<packaging>jar</packaging>
3232
<name>conseq4j</name>
3333
<description>A Java concurrent API to sequence related tasks while concurring unrelated ones</description>
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2023 Qingtian Wang
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package conseq4j;
26+
27+
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
29+
30+
/**
31+
* An interface representing a terminable executor that can be shut down and awaited for
32+
* termination.
33+
*/
34+
public interface Terminable extends AutoCloseable {
35+
36+
/**
37+
* Initiates an orderly shutdown in which previously submitted tasks are executed, but no new
38+
* tasks will be accepted.
39+
*/
40+
void shutdown();
41+
42+
/**
43+
* Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and
44+
* returns a list of the tasks that were awaiting execution.
45+
*
46+
* @return a list of tasks that never commenced execution
47+
*/
48+
List<Runnable> shutdownNow();
49+
50+
/**
51+
* Returns true if this executor has been shut down.
52+
*
53+
* @return true if this executor has been shut down
54+
*/
55+
boolean isShutdown();
56+
57+
/**
58+
* Returns true if all tasks have completed following shut down.
59+
*
60+
* @return true if all tasks have completed following shut down
61+
*/
62+
boolean isTerminated();
63+
64+
/**
65+
* Blocks until all tasks have completed execution after a shutdown request, or the timeout
66+
* occurs, or the current thread is interrupted, whichever happens first.
67+
*
68+
* @param timeout the maximum time to wait
69+
* @param unit the time unit of the timeout argument
70+
* @return true if this executor terminated and false if the timeout elapsed before termination
71+
* @throws InterruptedException if interrupted while waiting
72+
*/
73+
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
74+
75+
/** Closes this resource, shutting it down if necessary and awaiting its termination. */
76+
@Override
77+
default void close() {
78+
boolean terminated = isTerminated();
79+
if (!terminated) {
80+
shutdown();
81+
boolean interrupted = false;
82+
while (!terminated) {
83+
try {
84+
terminated = awaitTermination(1L, TimeUnit.DAYS);
85+
} catch (InterruptedException e) {
86+
if (!interrupted) {
87+
shutdownNow();
88+
interrupted = true;
89+
}
90+
}
91+
}
92+
if (interrupted) {
93+
Thread.currentThread().interrupt();
94+
}
95+
}
96+
}
97+
}

src/main/java/conseq4j/execute/ConseqExecutor.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import coco4j.MinimalisticFuture;
3030
import coco4j.MoreExecutors;
31+
import conseq4j.Terminable;
32+
import java.util.List;
3133
import java.util.Map;
3234
import java.util.concurrent.*;
3335
import javax.annotation.Nonnull;
@@ -43,7 +45,7 @@
4345
*/
4446
@ThreadSafe
4547
@ToString
46-
public final class ConseqExecutor implements TaskExecutor {
48+
public final class ConseqExecutor implements TaskExecutor, Terminable {
4749
/**
4850
* A concurrent hash map whose entries represent execution queues of sequential tasks. Each key in
4951
* the map is a sequence key, and the value is a CompletableFuture. Each completion stage of the
@@ -171,4 +173,29 @@ private ConseqExecutor(ExecutorService workerExecutorService) {
171173
boolean noTaskPending() {
172174
return executionQueues.isEmpty();
173175
}
176+
177+
@Override
178+
public void shutdown() {
179+
workerExecutorService.shutdown();
180+
}
181+
182+
@Override
183+
public @NonNull List<Runnable> shutdownNow() {
184+
return workerExecutorService.shutdownNow();
185+
}
186+
187+
@Override
188+
public boolean isShutdown() {
189+
return workerExecutorService.isShutdown();
190+
}
191+
192+
@Override
193+
public boolean isTerminated() {
194+
return workerExecutorService.isTerminated();
195+
}
196+
197+
@Override
198+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
199+
return workerExecutorService.awaitTermination(timeout, unit);
200+
}
174201
}

src/main/java/conseq4j/summon/ConseqFactory.java

+49-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static java.lang.Math.floorMod;
2727

2828
import coco4j.MoreExecutors;
29+
import conseq4j.Terminable;
30+
import java.util.ArrayList;
2931
import java.util.Collection;
3032
import java.util.List;
3133
import java.util.Objects;
@@ -44,7 +46,7 @@
4446
*/
4547
@ThreadSafe
4648
@ToString
47-
public final class ConseqFactory implements ExecutorServiceFactory {
49+
public final class ConseqFactory implements ExecutorServiceFactory, Terminable {
4850
private static final int DEFAULT_CONCURRENCY = Runtime.getRuntime().availableProcessors();
4951
private final int concurrency;
5052
private final ConcurrentMap<Object, ShutdownDisabledExecutorService> sequentialExecutors;
@@ -101,6 +103,44 @@ private int bucketOf(Object sequenceKey) {
101103
return floorMod(Objects.hash(sequenceKey), this.concurrency);
102104
}
103105

106+
@Override
107+
public void shutdown() {
108+
sequentialExecutors.values().forEach(ShutdownDisabledExecutorService::shutdownDelegate);
109+
}
110+
111+
@Override
112+
public @NonNull List<Runnable> shutdownNow() {
113+
List<Runnable> remaining = new ArrayList<>();
114+
sequentialExecutors
115+
.values()
116+
.forEach(executor -> remaining.addAll(executor.shutdownDelegateNow()));
117+
return remaining;
118+
}
119+
120+
@Override
121+
public boolean isShutdown() {
122+
return sequentialExecutors.values().stream()
123+
.allMatch(ShutdownDisabledExecutorService::isShutdown);
124+
}
125+
126+
@Override
127+
public boolean isTerminated() {
128+
return sequentialExecutors.values().stream()
129+
.allMatch(ShutdownDisabledExecutorService::isTerminated);
130+
}
131+
132+
@Override
133+
public boolean awaitTermination(long timeout, TimeUnit unit) {
134+
return sequentialExecutors.values().stream().parallel().allMatch(executorService -> {
135+
try {
136+
return executorService.awaitTermination(timeout, unit);
137+
} catch (InterruptedException ex) {
138+
Thread.currentThread().interrupt();
139+
throw new CompletionException(ex);
140+
}
141+
});
142+
}
143+
104144
/**
105145
* An {@link ExecutorService} that doesn't support shut down.
106146
*
@@ -132,12 +172,20 @@ public void shutdown() {
132172
throw new UnsupportedOperationException(SHUTDOWN_UNSUPPORTED_MESSAGE);
133173
}
134174

175+
void shutdownDelegate() {
176+
delegate.shutdown();
177+
}
178+
135179
/** @see #shutdown() */
136180
@Override
137181
public @Nonnull List<Runnable> shutdownNow() {
138182
throw new UnsupportedOperationException(SHUTDOWN_UNSUPPORTED_MESSAGE);
139183
}
140184

185+
@NonNull List<Runnable> shutdownDelegateNow() {
186+
return delegate.shutdownNow();
187+
}
188+
141189
public boolean isShutdown() {
142190
return this.delegate.isShutdown();
143191
}

src/test/java/conseq4j/execute/ConseqExecutorTest.java

+38-32
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,19 @@ class ConseqExecutorTest {
4545
@Test
4646
void exceptionallyCompletedSubmitShouldNotStopOtherTaskExecution() {
4747
List<Future<SpyingTask>> resultFutures;
48-
ConseqExecutor conseqExecutor = ConseqExecutor.instance();
4948
List<SpyingTask> tasks = TestUtils.createRunnableSpyingTasks(TASK_COUNT);
5049
UUID sameSequenceKey = UUID.randomUUID();
5150
resultFutures = new ArrayList<>();
5251
int cancelTaskIdx = 1;
5352
for (int i = 0; i < TASK_COUNT; i++) {
54-
Future<SpyingTask> taskFuture =
55-
conseqExecutor.submit(tasks.get(i).toCallable(), sameSequenceKey);
56-
if (i == cancelTaskIdx) {
57-
taskFuture.cancel(true);
53+
try (ConseqExecutor conseqExecutor = ConseqExecutor.instance()) {
54+
Future<SpyingTask> taskFuture =
55+
conseqExecutor.submit(tasks.get(i).toCallable(), sameSequenceKey);
56+
if (i == cancelTaskIdx) {
57+
taskFuture.cancel(true);
58+
}
59+
resultFutures.add(taskFuture);
5860
}
59-
resultFutures.add(taskFuture);
6061
}
6162
int cancelledCount = TestUtils.cancellationCount(resultFutures);
6263
int normalCompleteCount = TestUtils.normalCompletionCount(resultFutures);
@@ -79,21 +80,23 @@ void executeRunsAllTasksOfSameSequenceKeyInSequence() {
7980

8081
@Test
8182
void noExecutorLingersOnRandomSequenceKeys() {
82-
ConseqExecutor sut = ConseqExecutor.instance();
83-
List<SpyingTask> tasks = TestUtils.createRunnableSpyingTasks(100);
84-
tasks.parallelStream().forEach(t -> sut.execute(t, UUID.randomUUID()));
85-
TestUtils.awaitAllComplete(tasks);
86-
await().until(sut::noTaskPending);
83+
try (ConseqExecutor sut = ConseqExecutor.instance()) {
84+
List<SpyingTask> tasks = TestUtils.createRunnableSpyingTasks(100);
85+
tasks.parallelStream().forEach(t -> sut.execute(t, UUID.randomUUID()));
86+
TestUtils.awaitAllComplete(tasks);
87+
await().until(sut::noTaskPending);
88+
}
8789
}
8890

8991
@Test
9092
void noExecutorLingersOnSameSequenceKey() {
91-
ConseqExecutor sut = ConseqExecutor.instance();
92-
UUID sameSequenceKey = UUID.randomUUID();
93-
List<SpyingTask> tasks = TestUtils.createRunnableSpyingTasks(TASK_COUNT);
94-
tasks.parallelStream().forEach(t -> sut.execute(t, sameSequenceKey));
95-
TestUtils.awaitAllComplete(tasks);
96-
await().until(sut::noTaskPending);
93+
try (ConseqExecutor sut = ConseqExecutor.instance()) {
94+
UUID sameSequenceKey = UUID.randomUUID();
95+
List<SpyingTask> tasks = TestUtils.createRunnableSpyingTasks(TASK_COUNT);
96+
tasks.parallelStream().forEach(t -> sut.execute(t, sameSequenceKey));
97+
TestUtils.awaitAllComplete(tasks);
98+
await().until(sut::noTaskPending);
99+
}
97100
}
98101

99102
@Test
@@ -103,13 +106,14 @@ void provideConcurrencyAmongDifferentSequenceKeys() {
103106
long sameKeyStartTimeMillis;
104107
long sameKeyEndTimeMillis;
105108
long differentKeysStartTimeMillis;
106-
ConseqExecutor sut = ConseqExecutor.instance();
107-
sameKeyStartTimeMillis = System.currentTimeMillis();
108-
sameTasks.forEach(t -> sut.execute(t, sameSequenceKey));
109-
awaitAllComplete(sameTasks);
110-
sameKeyEndTimeMillis = System.currentTimeMillis();
111-
differentKeysStartTimeMillis = System.currentTimeMillis();
112-
sameTasks.forEach(t -> sut.execute(t, UUID.randomUUID()));
109+
try (ConseqExecutor sut = ConseqExecutor.instance()) {
110+
sameKeyStartTimeMillis = System.currentTimeMillis();
111+
sameTasks.forEach(t -> sut.execute(t, sameSequenceKey));
112+
awaitAllComplete(sameTasks);
113+
sameKeyEndTimeMillis = System.currentTimeMillis();
114+
differentKeysStartTimeMillis = System.currentTimeMillis();
115+
sameTasks.forEach(t -> sut.execute(t, UUID.randomUUID()));
116+
}
113117

114118
awaitAllComplete(sameTasks);
115119
long differentKeysEndTimeMillis = System.currentTimeMillis();
@@ -123,10 +127,11 @@ void submitConcurrencyBoundedByMaxConcurrency() {
123127
int maxConcurrency = 10;
124128
int taskCount = maxConcurrency * 2;
125129
List<Future<SpyingTask>> futures;
126-
ConseqExecutor conseqExecutor = ConseqExecutor.instance(maxConcurrency);
127-
futures = TestUtils.createRunnableSpyingTasks(taskCount).stream()
128-
.map(task -> conseqExecutor.submit(task.toCallable(), UUID.randomUUID()))
129-
.toList();
130+
try (ConseqExecutor conseqExecutor = ConseqExecutor.instance(maxConcurrency)) {
131+
futures = TestUtils.createRunnableSpyingTasks(taskCount).stream()
132+
.map(task -> conseqExecutor.submit(task.toCallable(), UUID.randomUUID()))
133+
.toList();
134+
}
130135
final long actualThreadCount = TestUtils.actualExecutionThreadCountIfAllCompleteNormal(futures);
131136
assertEquals(taskCount, actualThreadCount);
132137
assertEquals(0, actualThreadCount % maxConcurrency);
@@ -135,10 +140,11 @@ void submitConcurrencyBoundedByMaxConcurrency() {
135140
@Test
136141
void submitConcurrencyBoundedByTotalTaskCount() {
137142
List<Future<SpyingTask>> futures;
138-
ConseqExecutor conseqExecutor = ConseqExecutor.instance(TASK_COUNT * 10);
139-
futures = TestUtils.createRunnableSpyingTasks(TASK_COUNT).stream()
140-
.map(task -> conseqExecutor.submit(task.toCallable(), UUID.randomUUID()))
141-
.toList();
143+
try (ConseqExecutor conseqExecutor = ConseqExecutor.instance(TASK_COUNT * 10)) {
144+
futures = TestUtils.createRunnableSpyingTasks(TASK_COUNT).stream()
145+
.map(task -> conseqExecutor.submit(task.toCallable(), UUID.randomUUID()))
146+
.toList();
147+
}
142148
final long actualThreadCount = TestUtils.actualExecutionThreadCountIfAllCompleteNormal(futures);
143149
assertTrue(actualThreadCount <= TASK_COUNT);
144150
}

0 commit comments

Comments
 (0)