Skip to content

Commit aec1ef6

Browse files
committed
Rebasing to new v1 code.
1 parent f0cb33c commit aec1ef6

File tree

1 file changed

+76
-45
lines changed

1 file changed

+76
-45
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 76 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ go blocks are dispatched over an internal thread pool, which
1313
defaults to 8 threads. The size of this pool can be modified using
1414
the Java system property `clojure.core.async.pool-size`.
1515
16-
Set Java system property `clojure.core.async.go-checking` to true
17-
to validate go blocks do not invoke core.async blocking operations.
18-
Property is read once, at namespace load time. Recommended for use
19-
primarily during development. Invalid blocking calls will throw in
20-
go block threads - use Thread.setDefaultUncaughtExceptionHandler()
16+
Set Java system property `clojure.core.async.vthread-checking` to true
17+
to validate that core.async parking operations are not called outside of
18+
task blocks. Property is read once, at namespace load time. Recommended
19+
for use primarily during development. Invalid parking calls will throw in
20+
virtual threads - use Thread.setDefaultUncaughtExceptionHandler()
2121
to catch and handle."
2222
(:refer-clojure :exclude [reduce transduce into merge map take partition
2323
partition-by bounded-count])
@@ -117,6 +117,23 @@ to catch and handle."
117117
[^long msecs]
118118
(timers/timeout msecs))
119119

120+
;; task
121+
122+
(defn- check-not-in-vthread [op]
123+
(when (not (Thread/.isVirtual (Thread/currentThread)))
124+
(assert nil (str op " used not in virtual thread"))))
125+
126+
(defmacro defvthreadcheckingop
127+
[op doc arglist & body]
128+
(let [as (mapv #(list 'quote %) arglist)]
129+
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
130+
(if (Boolean/getBoolean "clojure.core.async.vthread-checking")
131+
(fn ~arglist
132+
(check-not-in-vthread ~op)
133+
~@body)
134+
(fn ~arglist
135+
~@body)))))
136+
120137
(defmacro defblockingop
121138
[op doc arglist & body]
122139
(let [as (mapv #(list 'quote %) arglist)]
@@ -130,22 +147,21 @@ to catch and handle."
130147

131148
(defblockingop <!!
132149
"takes a val from port. Will return nil if closed. Will block
133-
if nothing is available.
134-
Not intended for use in direct or transitive calls from (go ...) blocks.
135-
Use the clojure.core.async.go-checking flag to detect invalid use (see
136-
namespace docs)."
150+
if nothing is available."
137151
[port]
138152
(let [p (promise)
139153
ret (impl/take! port (fn-handler (on-caller #(deliver p %))))]
140154
(if ret
141155
@ret
142156
(deref p))))
143157

144-
(defn <!
145-
"takes a val from port. Must be called inside a (go ...) block. Will
146-
return nil if closed. Will park if nothing is available."
158+
(defvthreadcheckingop <!
159+
"takes a val from port. Must be called inside a (task ...) block. Will
160+
return nil if closed. Will park if nothing is available. Should be used
161+
inside of a (task ...) block. Use the clojure.core.async.vthread-checking
162+
flag to detect invalid use (see namespace docs)."
147163
[port]
148-
(assert nil "<! used not in (go ...) block"))
164+
(<!! port))
149165

150166
(defn take!
151167
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -169,23 +185,22 @@ to catch and handle."
169185

170186
(defblockingop >!!
171187
"puts a val into port. nil values are not allowed. Will block if no
172-
buffer space is available. Returns true unless port is already closed.
173-
Not intended for use in direct or transitive calls from (go ...) blocks.
174-
Use the clojure.core.async.go-checking flag to detect invalid use (see
175-
namespace docs)."
188+
buffer space is available. Returns true unless port is already closed."
176189
[port val]
177190
(let [p (promise)
178191
ret (impl/put! port val (fn-handler (on-caller #(deliver p %))))]
179192
(if ret
180193
@ret
181194
(deref p))))
182195

183-
(defn >!
196+
(defvthreadcheckingop >!
184197
"puts a val into port. nil values are not allowed. Must be called
185-
inside a (go ...) block. Will park if no buffer space is available.
186-
Returns true unless port is already closed."
198+
inside a (task ...) block. Will park if no buffer space is available.
199+
Returns true unless port is already closed. Should be used
200+
inside of a (task ...) block. Use the clojure.core.async.vthread-checking
201+
flag to detect invalid use (see namespace docs)."
187202
[port val]
188-
(assert nil ">! used not in (go ...) block"))
203+
(>!! port val))
189204

190205
(defn- nop [_])
191206
(def ^:private fhnop (fn-handler nop))
@@ -306,20 +321,17 @@ to catch and handle."
306321

307322
(defblockingop alts!!
308323
"Like alts!, except takes will be made as if by <!!, and puts will
309-
be made as if by >!!, will block until completed.
310-
Not intended for use in direct or transitive calls from (go ...) blocks.
311-
Use the clojure.core.async.go-checking flag to detect invalid use (see
312-
namespace docs)."
324+
be made as if by >!!, will block until completed."
313325
[ports & opts]
314326
(let [p (promise)
315327
ret (do-alts (on-caller #(deliver p %)) ports (apply hash-map opts))]
316328
(if ret
317329
@ret
318330
(deref p))))
319331

320-
(defn alts!
321-
"Completes at most one of several channel operations. Must be called
322-
inside a (go ...) block. ports is a vector of channel endpoints,
332+
(defvthreadcheckingop alts!
333+
"Completes at most one of several channel operations. Should be called
334+
inside a (task ...) block. ports is a vector of channel endpoints,
323335
which can be either a channel to take from or a vector of
324336
[channel-to-put-to val-to-put], in any combination. Takes will be
325337
made as if by <!, and puts will be made as if by >!. Unless
@@ -341,7 +353,7 @@ to catch and handle."
341353
depended upon for side effects."
342354

343355
[ports & {:as opts}]
344-
(assert nil "alts! used not in (go ...) block"))
356+
(alts!! ports opts))
345357

346358
(defn do-alt [alts clauses]
347359
(assert (even? (count clauses)) "unbalanced clauses")
@@ -382,16 +394,17 @@ to catch and handle."
382394
(= ~gch :default) val#))))
383395

384396
(defmacro alt!!
385-
"Like alt!, except as if by alts!!, will block until completed, and
386-
not intended for use in (go ...) blocks."
397+
"Like alt!, except as if by alts!!, will block until completed."
387398

388399
[& clauses]
389400
(do-alt `alts!! clauses))
390401

391402
(defmacro alt!
392403
"Makes a single choice between one of several channel operations,
393404
as if by alts!, returning the value of the result expr corresponding
394-
to the operation completed. Must be called inside a (go ...) block.
405+
to the operation completed. Should be used inside of a (task ...)
406+
block. Use the clojure.core.async.vthread-checking flag to detect
407+
invalid use (see namespace docs).
395408
396409
Each clause takes the form of:
397410
@@ -444,23 +457,41 @@ to catch and handle."
444457
(let [ret (impl/take! port (fn-handler nop false))]
445458
(when ret @ret)))
446459

447-
(defmacro go
448-
"Asynchronously executes the body, returning immediately to the
449-
calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
450-
channel operations within the body will block (if necessary) by
451-
'parking' the calling thread rather than tying up an OS thread (or
452-
the only JS thread when in ClojureScript). Upon completion of the
453-
operation, the body will be resumed.
454-
455-
go blocks should not (either directly or indirectly) perform operations
456-
that may block indefinitely. Doing so risks depleting the fixed pool of
457-
go block threads, causing all go block processing to stop. This includes
458-
core.async blocking ops (those ending in !!) and other blocking IO.
460+
;; task
461+
462+
(def task-factory
463+
(-> (Thread/ofVirtual)
464+
(Thread$Builder/.name "task-" 0)
465+
.factory))
466+
467+
(defmacro task
468+
"Asynchronously executes the body in a virtual thread, returning immediately
469+
to the calling thread.
470+
471+
task blocks should not (either directly or indirectly) perform operations
472+
that may block indefinitely. Doing so risks pinning the virtual thread
473+
to its carrier thread.
459474
460475
Returns a channel which will receive the result of the body when
461476
completed"
462477
[& body]
463-
(#'clojure.core.async.impl.go/go-impl &env body))
478+
`(let [c# (chan 1)
479+
captured-bindings# (Var/getThreadBindingFrame)]
480+
(.execute
481+
(Executors/newThreadPerTaskExecutor task-factory)
482+
(^:once fn* []
483+
(Var/resetThreadBindingFrame captured-bindings#)
484+
(try
485+
(let [result# (do ~@body)]
486+
(>!! c# result#))
487+
(finally
488+
(close! c#)))))
489+
c#))
490+
491+
(defmacro go
492+
"Dispatches to task macro."
493+
[& body]
494+
`(task ~body))
464495

465496
(defonce ^:private ^Executor thread-macro-executor
466497
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))

0 commit comments

Comments
 (0)