diff --git a/src/main/clojure/clojure/core/async/flow/impl.clj b/src/main/clojure/clojure/core/async/flow/impl.clj index b2bdf03..e296c83 100644 --- a/src/main/clojure/clojure/core/async/flow/impl.clj +++ b/src/main/clojure/clojure/core/async/flow/impl.clj @@ -26,15 +26,6 @@ (defn oid [x] (symbol (str (-> x class .getSimpleName) "@" (-> x System/identityHashCode Integer/toHexString)))) -(defn chan->data - [^clojure.core.async.impl.channels.ManyToManyChannel c] - (let [b (.buf c)] - {:buffer (if (some? b) (oid b) :none) - :buffer-count (count b) - :put-count (count (.puts c)) - :take-count (count (.takes c)) - :closed? (clojure.core.async.impl.protocols/closed? c)})) - (defn exec->data [exec] (let [ess (as-> (str exec) ^String es (.substring es (inc (.lastIndexOf es "[")) (.lastIndexOf es "]")) @@ -49,7 +40,6 @@ clojure.lang.Fn (-> x str symbol) ExecutorService (exec->data x) clojure.lang.Var (symbol x) - clojure.core.async.impl.channels.ManyToManyChannel (chan->data x) (datafy/datafy x))) (defn futurize ^Future [f {:keys [exec]}] @@ -242,9 +232,8 @@ (loop [nstatus nstatus, nstate nstate, msgs (seq msgs)] (if (or (nil? msgs) (= nstatus :exit)) [nstatus nstate] - (let [m (if-some [m (first msgs)] m (throw (Exception. "messages must be non-nil"))) - [v c] (async/alts!! - [control [outc m]] + (let [[v c] (async/alts!! + [control [outc (first msgs)]] :priority true)] (if (= c control) (let [nnstatus (handle-command nstatus v)