Skip to content

Commit b158c77

Browse files
committed
Removing parking op usability in io-thread
1 parent 8bb5149 commit b158c77

File tree

2 files changed

+29
-42
lines changed

2 files changed

+29
-42
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ return nil for unexpected contexts."
5656
[java.util.concurrent ThreadLocalRandom]
5757
[java.util Arrays ArrayList]))
5858

59-
(def ^:private lazy-loading-supported? (dispatch/at-least-clojure-version? [1 12 3]))
59+
(defn- at-least-clojure-version?
60+
[[maj min incr]]
61+
(let [{:keys [major minor incremental]} *clojure-version*]
62+
(not (neg? (compare [major minor incremental] [maj min incr])))))
63+
64+
(def ^:private lazy-loading-supported? (at-least-clojure-version? [1 12 3]))
6065

6166
(when-not lazy-loading-supported?
6267
(require 'clojure.core.async.impl.go))
@@ -142,21 +147,6 @@ return nil for unexpected contexts."
142147
[^long msecs]
143148
(timers/timeout msecs))
144149

145-
(defn- defparkingop* [op doc arglist]
146-
(let [as (mapv #(list 'quote %) arglist)
147-
blockingop (-> op name (str "!") symbol)]
148-
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
149-
(fn [~'& ~'args]
150-
(if (dispatch/in-vthread?)
151-
~(list* apply blockingop '[args])
152-
(assert nil ~(str op " used not in (go ...) block")))))))
153-
154-
(defmacro defparkingop
155-
"Emits a Var with a function that checks if it's running in a virtual thread. If so then
156-
the related blocking op will be called, otherwise the function throws."
157-
[op doc arglist]
158-
(defparkingop* op doc arglist))
159-
160150
(defmacro defblockingop
161151
[op doc arglist & body]
162152
(let [as (mapv #(list 'quote %) arglist)]
@@ -181,11 +171,11 @@ return nil for unexpected contexts."
181171
@ret
182172
(deref p))))
183173

184-
(defparkingop <!
185-
"takes a val from port. Must be called inside a (go ...) block, or on
186-
a virtual thread (no matter how it was started). Will return nil if
187-
closed. Will park if nothing is available."
188-
[port])
174+
(defn <!
175+
"takes a val from port. Must be called inside a (go ...) block. Will
176+
return nil if closed. Will park if nothing is available."
177+
[port]
178+
(assert nil "<! used not in (go ...) block"))
189179

190180
(defn take!
191181
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -220,12 +210,12 @@ return nil for unexpected contexts."
220210
@ret
221211
(deref p))))
222212

223-
(defparkingop >!
213+
(defn >!
224214
"puts a val into port. nil values are not allowed. Must be called
225-
inside a (go ...) block, or on a virtual thread (no matter how it
226-
was started). Will park if no buffer space is available.
215+
inside a (go ...) block. Will park if no buffer space is available.
227216
Returns true unless port is already closed."
228-
[port val])
217+
[port val]
218+
(assert nil ">! used not in (go ...) block"))
229219

230220
(def ^:private nop (on-caller (fn [_])))
231221
(def ^:private fhnop (fn-handler nop))
@@ -363,16 +353,16 @@ return nil for unexpected contexts."
363353
@ret
364354
(deref p))))
365355

366-
(defparkingop alts!
367-
"Completes at most one of several channel operations. Must be called
368-
inside a (go ...) block, or on a virtual thread (no matter how it was
369-
started). ports is a vector of channel endpoints, which can be either
370-
a channel to take from or a vector of [channel-to-put-to val-to-put],
371-
in any combination. Takes will be made as if by <!, and puts will be
372-
made as if by >!. Unless the :priority option is true, if more than one
373-
port operation is ready a non-deterministic choice will be made. If no
374-
operation is ready and a :default value is supplied, [default-val :default]
375-
will be returned, otherwise alts! will park until the first operation to
356+
(defn alts!
357+
"Completes at most one of several channel operations. Must be called
358+
inside a (go ...) block. ports is a vector of channel endpoints,
359+
which can be either a channel to take from or a vector of
360+
[channel-to-put-to val-to-put], in any combination. Takes will be
361+
made as if by <!, and puts will be made as if by >!. Unless
362+
the :priority option is true, if more than one port operation is
363+
ready a non-deterministic choice will be made. If no operation is
364+
ready and a :default value is supplied, [default-val :default] will
365+
be returned, otherwise alts! will park until the first operation to
376366
become ready completes. Returns [val port] of the completed
377367
operation, where val is the value taken for takes, and a
378368
boolean (true unless already closed, as per put!) for puts.
@@ -386,7 +376,8 @@ return nil for unexpected contexts."
386376
used, nor in what order should they be, so they should not be
387377
depended upon for side effects."
388378

389-
[ports & {:as opts}])
379+
[ports & {:as opts}]
380+
(assert nil "alts! used not in (go ...) block"))
390381

391382
(defn do-alt [alts clauses]
392383
(assert (even? (count clauses)) "unbalanced clauses")
@@ -542,7 +533,8 @@ return nil for unexpected contexts."
542533
"Executes the body in a thread, returning immediately to the calling
543534
thread. The body may do blocking I/O but must not do extended computation.
544535
Returns a channel which will receive the result of the body when completed,
545-
then close."
536+
then close. When virtual threads are available in the runtime JVM (>= v21)
537+
then core.async will dispatch body to one."
546538
[& body]
547539
`(thread-call (^:once fn* [] ~@body) :io))
548540

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,6 @@
7272
[workload]
7373
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true)))
7474

75-
(defn at-least-clojure-version?
76-
[[maj min incr]]
77-
(let [{:keys [major minor incremental]} *clojure-version*]
78-
(not (neg? (compare [major minor incremental] [maj min incr])))))
79-
8075
(def virtual-threads-available?
8176
(try
8277
(Class/forName "java.lang.Thread$Builder$OfVirtual")

0 commit comments

Comments
 (0)