It is often said one should not offer criticism without suggesting an alternative.
In a previous post which dealt with the idiomatic ways of mixing side
effects and iteration in Clojure, I mentioned pmap
is a bad option for
performing side effects in parallel.
Since I'm not paid highly enough to just offer my criticism and let
others figure it out by themselves, I'll explain in this post why pmap
should be avoided for side effects, and what other alternatives are out
there for effective (and Effective) multi-threaded programming in
Clojure
Before We Begin
This time just waving hands with api-call!
wouldn't suffice, so
we'll fake one to use in our examples:
(defn api-call! [x] (Thread/sleep 2000) (println x) x)
It does side effects, blocks and returns a value. Everything we need to make an interesting function.
Requirements:
'{:deps {org.clojure/core.async {:mvn/version "RELEASE"} org.clojure/clojure {:mvn/version "RELEASE"} manifold {:mvn/version "RELEASE"} funcool/promesa {:mvn/version "RELEASE"} tolitius/lasync {:mvn/version "RELEASE"} com.climate/claypool {:mvn/version "RELEASE"}}}
But Why Not pmap
?
pmap
is easy, accessible, and understandable.
If it's so easy, why shouldn't we use it?
skip to the TL;DR if you don't want to read the analysis.
To understand, try to answer the following questions:
- what happens when you evaluate the following expression?
- when will it happen?
- on which threads?
- what is the value of
xs
?
(def xs (pmap api-call! (range 128))) (time (reduce + 0 (pmap api-call! (range 128)))) (/ 128 8) ;; => 16 (time (run! println (pmap api-call! (range 512)))) (/ 512 32) ;; => 16
The answers to these questions are fundamental in our understanding of
why pmap
is a bad choice for performing side effects. We lack
control over important parameters (when, where) which are critical
when performing side effects.
What happens when you pmap
?
This is what gets printed out:
1 2019 27 14 5 16 10 80 6 4 13 7 11 29 12 31 15 9 17 2 28 18 21 3 25 24 23 26 22 30
What are we seeing?
First, it's important to understand an implementation detail which concerns lazy sequences in Clojure: They are realized in chunks of 32 elements at a time.
Since pmap
is lazy, a chunk was realized. But if you'll notice, it
didn't take 64 seconds, although each api call takes ~2 seconds.
That is because pmap
creates a future
for every element in the
sequence:
(map #(future (f %)) coll)
Another subtlety is that pmap
is "semi" lazy, in that it tries to
stay realize availableProcessors
+2 elements ahead.
When?
Hard to tell, pmap
will give it a good try to stay ahead of your
computation, but what happens when you use pmap
in too many places
in your code in parallel? The answer to that is not so deterministic.
On Which Thread?
pmap
uses future
which uses agents' soloExecutor
service, which
is a cached thread pool.
What does it mean in layman's terms? We can't know on which thread it
happens, or on how many. In I think it's safe to assume each pmap
call will use availableProcessors
+2 threads. What happens if you
call it twice in close succession?
This also complicates exception handling and the option of providing a default handler.
The value of xs
The value of xs
will change in time and in execution, as it will
block while the rest of the lazy sequence materializes.
pmap
TL;DR
pmap
is great for computation since it provides a high degree of
semi-lazy parallelism, however, since it is lazy, lacks control over
execution context (threads number, exception handling, back pressure)
it is unsuitable for performing side effects where these issues can
make or break a system.
Executors
One of the most straightforward options is using
java.util.concurrent.Executors
. Executors are flexible, support
different policies, and are highly configurable. The below example is
rather simple, but demonstrates how easy it is to get started with
them.
Unbounded Queue
(import '[java.util.concurrent Executors ExecutorService]) (defn fixed-pool ([n] (Executors/newFixedThreadPool n)) ([n factory] (Executors/newFixedThreadPool n factory))) (defonce default-pool (delay (Executors/newFixedThreadPool 2))) (defn submit* [pool f] (.submit ^ExecutorService pool ^Callable f)) (defmacro submit [pool & body] `(submit* ~pool (fn* [] ~@body))) (defn pmap* ([f xs] (pmap* @default-pool f xs)) ([pool f xs] (->> xs (mapv #(submit pool (f %))) (mapv deref)))) (pmap* (fixed-pool 2) api-call! (vec (range 10)))
For a slightly different example, see core.async's implementation.
Blocking Queue
One glaring deficiency in the previous example is the lack of back pressure, which could lead to us blowing the heap up with enqueued tasks.
Using others' implementation is no shame, so let's have a look at tolitius/lasync
As stated in the README, the purpose of the library is to be able to
block on .submit
and .execute
, which is exactly the issue we saw
in the previous implementation.
lasync/pool
returns a ThreadPoolExecutor
so it plugs right in to
our pmap*
implementation.
(require '[lasync.core :as lasync]) (def pool (lasync/pool :threads 2)) (pmap* pool api-call! (range 10))
Alternative implementations
If you don't want to implement anything yourself, including pmap
,
and perhaps need a richer API, which for example doesn't guarantee
ordering, or supports parallel list comprehensions (for
), take a
look at TheClimateCorporation/claypoole
Usage is very straightforward:
(require '[com.climate.claypoole :as cp]) (def pool (cp/threadpool 4)) (def output (cp/pmap pool api-call! (range 64)))
core.async
Same solutions which used the executor can be implemented with different abstractions and contexts. An interesting one is core.async, which allows us to use pipelines for sequence processing. If we look at each batch as a sequence, the following implementation arises naturally:
pipeline
(require '[clojure.core.async :as async]) (defn parallel "Returns a channel which will contain a transient vector of results" ([n f xs] (parallel nil n f xs)) ([buf-or-n n f xs] (let [out (async/chan)] (async/pipeline-blocking n out (map f) (async/to-chan xs)) (async/reduce conj! (transient []) out)))) (def ch (parallel 4 api-call! (range 16))) (persistent! (async/<!! ch)) ;; => [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
If api-call!
had been asynchronous pipeline-blocking
can be
swapped for pipeline-async
, and instead of a transducer:
(fn af [v c] (api-call! v (fn cb [res] (async/put! c res) (async/close! c))))
A slight problem with the above implementation is that it has higher
overhead than previous ones. It allocates plenty of channels, new
async/thread
s for each call, which isn't completely terrible as it
uses a cached thread pool, and requires working with channels all the
way.
It's probably a suitable solution if your entire code base is already written in the abstraction, otherwise I'm not sure I'd go with it.
Thread Pool
If you're using core.async and haven't read Things I Wish I knew about core.async take a few extra minutes of your day to read it. Building off the final example in the post:
(defn async-wrapper [pool f] (let [ch (async/chan 1)] (.submit pool (fn [] (try (async/put! ch (f)) (catch Exception e (async/put! ch (ex-info "some error" {} e))) (finally (async/close! ch))))) ch)) (defmacro asyncly [pool & body] `(async-wrapper ~pool (fn* [] ~@body))) (->> (range 8) (mapv #(asyncly @default-pool (api-call! %))) async/merge (async/reduce conj []) async/<!!) ;; => [1 0 3 2 4 5 6 7]
This example nicely ties together everything we've seen until now. Pay attention that it does not preserve order.
Promesa
Promesa is a promise library for Clojure and ClojureScript, with pretty light overhead.
Promesa does not just provide facilities for concurrent execution, but models them as Effect types, thus providing a very human friendly transition between the following:
-- start with f :: a -> b -- mapping it f' :: [a] -> [b] f'' :: [a] -> [Future b] -- map api-call+ [a] g :: [Future b] -> Future [b] -- all h :: Future [b] -> [b] -- deref
(Any Haskellers who want to shout at me or correct me for the mess I wrote here are welcome, I promise to correct it based on your feedback)
See the documentation
(require '[promesa.core :as p] '[promesa.exec :as exec]) (def xs (vec (range 32))) (defn api-call+ [ex x] (p/then (p/promise x) api-call! ex)) (def ex (exec/fixed-pool 4)) (def p (p/all (map (partial api-call+ ex) xs))) @p;; => [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
Promesa Just Works and has light overhead, and would be my pick in most use cases.
Manifold
Manifold's deferred abstraction is pretty similar to Promesa's promises, with slightly higher overhead. It also offers a stream abstraction which isn't necessarily relevant for this discussion.
See the documentation
(require '[manifold.deferred :as d] '[manifold.executor :as e]) (def ex (e/fixed-thread-executor 4)) (defn manifold-api-call [ex x] (let [d (d/deferred ex) c (d/chain d #(future (api-call! %)))] (d/success! d x) c)) (def out (apply d/zip (mapv (partial manifold-api-call ex) (range 32))))
Summary
Similarities
Promesa and Manifold
both operate on deferred values, and have the option of lifting a sequence of deferred values into a single deferred value of the sequence.
Differences
Monads vs. Java
Promesa and Manifold are "up there" in a monadic world of effect types while the Java leaning implementations are more "just get things done" solutions. More mechanical.
core.async vs. everything else
The core.async solutions is the most mechanical and out of the common abstractions which already exist in Clojure (map, executors).
Retrospective
The Clojuriest
Of all available solutions the most Clojure-y ones are definitely tolitius/lasync and TheClimateCorporation/claypoole. Claypoole especially provides an almost drop-in replacement for your bad code.
This should probably be your go-to option when trying to clean up
any misuses of pmap
, doall
, or any other idiosyncrasies.
The comfiest
I have to admit I like Promesa. The abstraction is comfortable and
lends itself well to a code base which relies on asynchronous
computation. It also plays well with funcool's other library,
cats
, which implements category theory concepts in Clojure.