|
| 1 | +import java.io.IOException; |
| 2 | +import java.nio.file.Files; |
| 3 | +import java.nio.file.Paths; |
| 4 | +import java.util.Queue; |
| 5 | +import java.util.concurrent.ConcurrentLinkedQueue; |
| 6 | +import java.util.concurrent.ExecutorService; |
| 7 | +import java.util.concurrent.Executors; |
| 8 | +import java.util.concurrent.atomic.AtomicInteger; |
| 9 | +import java.util.function.Consumer; |
| 10 | + |
| 11 | +import java.util.Timer; |
| 12 | +import java.util.TimerTask; |
| 13 | + |
| 14 | +import jdk.internal.vm.Continuation; |
| 15 | +import jdk.internal.vm.ContinuationScope; |
| 16 | + |
| 17 | +/** |
| 18 | + * Examples from "Continuations: The magic behind virtual threads in Java by Balkrishna Rawool @ Spring I/O 2024" |
| 19 | + * |
| 20 | + * Run: |
| 21 | + * ```bash |
| 22 | + * javac --enable-preview --source 21 --target 21 --add-exports java.base/jdk.internal.vm=ALL-UNNAMED VirtualThreadsContinuations.java |
| 23 | + * java --enable-preview --add-exports java.base/jdk.internal.vm=ALL-UNNAMED VirtualThreadsContinuations |
| 24 | + * ``` |
| 25 | + */ |
| 26 | +public class VirtualThreadsContinuations { |
| 27 | + public static void main(String[] args) { |
| 28 | + System.out.println("=== Continuation Example ==="); |
| 29 | + basicExample(); |
| 30 | + |
| 31 | + System.out.println("\n=== Generator Example ==="); |
| 32 | + generatorExample(); |
| 33 | + |
| 34 | + System.out.println("\n=== Custom VirtualThread Example ==="); |
| 35 | + virtualThreadExample(); |
| 36 | + } |
| 37 | + |
| 38 | + static void basicExample() { |
| 39 | + var scope = new ContinuationScope("my-scope"); |
| 40 | + // continuation needs a scope and a task |
| 41 | + var continuation = new Continuation(scope, () -> { |
| 42 | + System.out.println("A"); |
| 43 | + // `yield` pauses the scope execution and returns the execution to another scope |
| 44 | + // the execution will be paused at the point where `Continuation.yield` is called |
| 45 | + // the stacktrace is copied to heap |
| 46 | + // will be paused until `run` is called again |
| 47 | + Continuation.yield(scope); |
| 48 | + System.out.println("B"); |
| 49 | + Continuation.yield(scope); |
| 50 | + System.out.println("C"); |
| 51 | + }); |
| 52 | + |
| 53 | + System.out.println("Starting continuation"); |
| 54 | + continuation.run(); |
| 55 | + System.out.println("Resuming continuation"); |
| 56 | + continuation.run(); |
| 57 | + continuation.run(); |
| 58 | + // if we try to run a running continuation or terminated continuation, a IllegalStateException is thrown |
| 59 | + // continuation.run(); |
| 60 | + if (continuation.isDone()) { |
| 61 | + System.out.println("Continuation terminated"); |
| 62 | + } else { |
| 63 | + System.out.println("Continuation is pending"); |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + static void generatorExample() { |
| 68 | + var generator = new Generator<String>(source -> { |
| 69 | + try { |
| 70 | + Files.list(Paths.get("/")) |
| 71 | + .forEach(path -> source.yield(path.toString())); |
| 72 | + } catch (IOException ex) { |
| 73 | + source.yield("Error: " + ex.getMessage()); |
| 74 | + } |
| 75 | + }); |
| 76 | + |
| 77 | + while (generator.hasNext()) { |
| 78 | + System.out.println("Generated: " + generator.next()); |
| 79 | + } |
| 80 | + } |
| 81 | + |
| 82 | + static void virtualThreadExample() { |
| 83 | + // start the scheduler in a new thread just for it |
| 84 | + new Thread(VirtualThreadScheduler.SCHEDULER::start).start(); |
| 85 | + |
| 86 | + var vt1 = new VirtualThread(() -> { |
| 87 | + System.out.println("1.1"); |
| 88 | + System.out.println("1.2"); |
| 89 | + WaitingOperation.perform("Network", 2); |
| 90 | + System.out.println("1.3"); |
| 91 | + System.out.println("1.4"); |
| 92 | + }); |
| 93 | + var vt2 = new VirtualThread(() -> { |
| 94 | + System.out.println("2.1"); |
| 95 | + System.out.println("2.2"); |
| 96 | + WaitingOperation.perform("Disk", 2); |
| 97 | + System.out.println("2.3"); |
| 98 | + System.out.println("2.4"); |
| 99 | + }); |
| 100 | + |
| 101 | + VirtualThreadScheduler.SCHEDULER.schedule(vt1); |
| 102 | + VirtualThreadScheduler.SCHEDULER.schedule(vt2); |
| 103 | + } |
| 104 | +} |
| 105 | + |
| 106 | +class Generator<T> { |
| 107 | + private ContinuationScope scope; |
| 108 | + private Continuation continuation; |
| 109 | + private Source source; |
| 110 | + |
| 111 | + public class Source { |
| 112 | + private T value; |
| 113 | + |
| 114 | + public T getValue() { |
| 115 | + var v = this.value; |
| 116 | + continuation.run(); |
| 117 | + return v; |
| 118 | + } |
| 119 | + |
| 120 | + public void yield(T value) { |
| 121 | + this.value = value; |
| 122 | + Continuation.yield(scope); |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + public Generator(Consumer<Source> consumer) { |
| 127 | + this.scope = new ContinuationScope("generator"); |
| 128 | + this.source = new Source(); |
| 129 | + this.continuation = new Continuation(this.scope, () -> consumer.accept(source)); |
| 130 | + this.continuation.run(); |
| 131 | + } |
| 132 | + |
| 133 | + public boolean hasNext() { |
| 134 | + return !continuation.isDone(); |
| 135 | + } |
| 136 | + |
| 137 | + public T next() { |
| 138 | + return this.source.getValue(); |
| 139 | + } |
| 140 | +} |
| 141 | + |
| 142 | +class VirtualThread { |
| 143 | + private static final AtomicInteger COUNTER = new AtomicInteger(1); |
| 144 | + public static final ContinuationScope SCOPE = new ContinuationScope("virtual-threads"); |
| 145 | + |
| 146 | + private int id; |
| 147 | + private Continuation continuation; |
| 148 | + |
| 149 | + public VirtualThread(Runnable runnable) { |
| 150 | + this.id = COUNTER.getAndIncrement(); |
| 151 | + this.continuation = new Continuation(SCOPE, runnable); |
| 152 | + } |
| 153 | + |
| 154 | + public void run() { |
| 155 | + System.out.println("VirtualThread " + id + " is running on " + Thread.currentThread()); |
| 156 | + continuation.run(); |
| 157 | + } |
| 158 | +} |
| 159 | + |
| 160 | +class VirtualThreadScheduler { |
| 161 | + public static final VirtualThreadScheduler SCHEDULER = new VirtualThreadScheduler(); |
| 162 | + public static final ScopedValue<VirtualThread> CURRENT_VIRTUAL_THREAD = ScopedValue.newInstance(); |
| 163 | + |
| 164 | + private Queue<VirtualThread> queue = new ConcurrentLinkedQueue<>(); |
| 165 | + private ExecutorService executor = Executors.newFixedThreadPool(3); |
| 166 | + |
| 167 | + public void start() { |
| 168 | + while (true) { |
| 169 | + if (!queue.isEmpty()) { |
| 170 | + var vt = queue.remove(); |
| 171 | + executor.submit(() -> ScopedValue.where(CURRENT_VIRTUAL_THREAD, vt).run(vt::run)); |
| 172 | + } |
| 173 | + } |
| 174 | + } |
| 175 | + |
| 176 | + public void schedule(VirtualThread virtualThread) { |
| 177 | + queue.add(virtualThread); |
| 178 | + } |
| 179 | +} |
| 180 | + |
| 181 | +class WaitingOperation { |
| 182 | + public static void perform(String task, int durationInSeconds) { |
| 183 | + System.out.printf("Waiting for %s for %d seconds\n", task, durationInSeconds); |
| 184 | + var virtualThread = VirtualThreadScheduler.CURRENT_VIRTUAL_THREAD.get(); |
| 185 | + |
| 186 | + var timer = new Timer(); |
| 187 | + timer.schedule(new TimerTask() { |
| 188 | + public void run() { |
| 189 | + VirtualThreadScheduler.SCHEDULER.schedule(virtualThread); |
| 190 | + timer.cancel(); |
| 191 | + } |
| 192 | + }, durationInSeconds * 1000L); |
| 193 | + |
| 194 | + Continuation.yield(VirtualThread.SCOPE); |
| 195 | + } |
| 196 | +} |
0 commit comments