It is often said one should not offer criticism without suggesting an alternative.

In a previous post which dealt with the idiomatic ways of mixing side effects and iteration in Clojure, I mentioned pmap is a bad option for performing side effects in parallel.

Since I'm not paid highly enough to just offer my criticism and let others figure it out by themselves, I'll explain in this post why pmap should be avoided for side effects, and what other alternatives are out there for effective (and Effective) multi-threaded programming in Clojure

Before We Begin

This time just waving hands with api-call! wouldn't suffice, so we'll fake one to use in our examples:

(defn api-call!
  [x]
  (Thread/sleep 2000)
  (println x)
  x)

It does side effects, blocks and returns a value. Everything we need to make an interesting function.

Requirements:

'{:deps
 {org.clojure/core.async {:mvn/version "RELEASE"}
  org.clojure/clojure {:mvn/version "RELEASE"}
  manifold {:mvn/version "RELEASE"}
  funcool/promesa {:mvn/version "RELEASE"}
  tolitius/lasync {:mvn/version "RELEASE"}
  com.climate/claypool {:mvn/version "RELEASE"}}}

But Why Not pmap?

pmap is easy, accessible, and understandable.

If it's so easy, why shouldn't we use it?

skip to the TL;DR if you don't want to read the analysis.

To understand, try to answer the following questions:

  • what happens when you evaluate the following expression?
  • when will it happen?
  • on which threads?
  • what is the value of xs?
(def xs (pmap api-call! (range 128)))

(time
 (reduce + 0 (pmap api-call! (range 128))))

(/ 128 8) ;; => 16


(time
 (run! println (pmap api-call! (range 512))))

(/ 512 32) ;; => 16

The answers to these questions are fundamental in our understanding of why pmap is a bad choice for performing side effects. We lack control over important parameters (when, where) which are critical when performing side effects.

What happens when you pmap?

This is what gets printed out:

1
2019
27
14
5
16
10
80
6
4
13
7
11
29
12
31
15
9
17
2
28
18
21
3
25

24
23
26
22
30



What are we seeing?

First, it's important to understand an implementation detail which concerns lazy sequences in Clojure: They are realized in chunks of 32 elements at a time.

Since pmap is lazy, a chunk was realized. But if you'll notice, it didn't take 64 seconds, although each api call takes ~2 seconds.

That is because pmap creates a future for every element in the sequence:

(map #(future (f %)) coll)

Another subtlety is that pmap is "semi" lazy, in that it tries to stay realize availableProcessors +2 elements ahead.

When?

Hard to tell, pmap will give it a good try to stay ahead of your computation, but what happens when you use pmap in too many places in your code in parallel? The answer to that is not so deterministic.

On Which Thread?

pmap uses future which uses agents' soloExecutor service, which is a cached thread pool.

What does it mean in layman's terms? We can't know on which thread it happens, or on how many. In I think it's safe to assume each pmap call will use availableProcessors +2 threads. What happens if you call it twice in close succession?

This also complicates exception handling and the option of providing a default handler.

The value of xs

The value of xs will change in time and in execution, as it will block while the rest of the lazy sequence materializes.

pmap TL;DR

pmap is great for computation since it provides a high degree of semi-lazy parallelism, however, since it is lazy, lacks control over execution context (threads number, exception handling, back pressure) it is unsuitable for performing side effects where these issues can make or break a system.

Executors

One of the most straightforward options is using java.util.concurrent.Executors. Executors are flexible, support different policies, and are highly configurable. The below example is rather simple, but demonstrates how easy it is to get started with them.

Unbounded Queue

(import '[java.util.concurrent Executors ExecutorService])

(defn fixed-pool
  ([n]
   (Executors/newFixedThreadPool n))
  ([n factory]
   (Executors/newFixedThreadPool n factory)))

(defonce default-pool (delay (Executors/newFixedThreadPool 2)))

(defn submit*
  [pool f]
  (.submit ^ExecutorService pool ^Callable f))

(defmacro submit
  [pool & body]
  `(submit* ~pool (fn* [] ~@body)))

(defn pmap*
  ([f xs]
   (pmap* @default-pool f xs))
  ([pool f xs]
   (->> xs
        (mapv #(submit pool (f %)))
        (mapv deref))))

(pmap* (fixed-pool 2) api-call! (vec (range 10)))

For a slightly different example, see core.async's implementation.

Blocking Queue

One glaring deficiency in the previous example is the lack of back pressure, which could lead to us blowing the heap up with enqueued tasks.

Using others' implementation is no shame, so let's have a look at tolitius/lasync

As stated in the README, the purpose of the library is to be able to block on .submit and .execute, which is exactly the issue we saw in the previous implementation.

lasync/pool returns a ThreadPoolExecutor so it plugs right in to our pmap* implementation.

(require '[lasync.core :as lasync])

(def pool (lasync/pool :threads 2))

(pmap* pool api-call! (range 10))

Alternative implementations

If you don't want to implement anything yourself, including pmap, and perhaps need a richer API, which for example doesn't guarantee ordering, or supports parallel list comprehensions (for), take a look at TheClimateCorporation/claypoole

Usage is very straightforward:

(require '[com.climate.claypoole :as cp])
(def pool (cp/threadpool 4))
(def output (cp/pmap pool api-call! (range 64)))

core.async

Same solutions which used the executor can be implemented with different abstractions and contexts. An interesting one is core.async, which allows us to use pipelines for sequence processing. If we look at each batch as a sequence, the following implementation arises naturally:

pipeline

(require '[clojure.core.async :as async])

(defn parallel
  "Returns a channel which will contain a transient vector of results"
  ([n f xs]
   (parallel nil n f xs))
  ([buf-or-n n f xs]
   (let [out (async/chan)]
     (async/pipeline-blocking
      n
      out
      (map f)
      (async/to-chan xs))
     (async/reduce conj! (transient []) out))))


(def ch (parallel 4 api-call! (range 16)))
(persistent! (async/<!! ch))
;; => [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]

If api-call! had been asynchronous pipeline-blocking can be swapped for pipeline-async, and instead of a transducer:

(fn af [v c]
  (api-call!
   v
   (fn cb [res] (async/put! c res) (async/close! c))))

A slight problem with the above implementation is that it has higher overhead than previous ones. It allocates plenty of channels, new async/thread s for each call, which isn't completely terrible as it uses a cached thread pool, and requires working with channels all the way.

It's probably a suitable solution if your entire code base is already written in the abstraction, otherwise I'm not sure I'd go with it.

Thread Pool

If you're using core.async and haven't read Things I Wish I knew about core.async take a few extra minutes of your day to read it. Building off the final example in the post:

(defn async-wrapper [pool f]
  (let [ch (async/chan 1)]
    (.submit
     pool
     (fn []
       (try (async/put! ch (f))
            (catch Exception e (async/put! ch (ex-info "some error" {} e)))
            (finally (async/close! ch)))))
    ch))

(defmacro asyncly
  [pool & body]
  `(async-wrapper ~pool (fn* [] ~@body)))


(->> (range 8)
     (mapv #(asyncly @default-pool (api-call! %)))
     async/merge
     (async/reduce conj [])
     async/<!!)
;; => [1 0 3 2 4 5 6 7]

This example nicely ties together everything we've seen until now. Pay attention that it does not preserve order.

Promesa

Promesa is a promise library for Clojure and ClojureScript, with pretty light overhead.

Promesa does not just provide facilities for concurrent execution, but models them as Effect types, thus providing a very human friendly transition between the following:

-- start with
f :: a -> b
-- mapping it
f' :: [a] -> [b]

f'' :: [a] -> [Future b] -- map api-call+ [a]
g :: [Future b] -> Future [b] -- all
h :: Future [b] -> [b] -- deref

(Any Haskellers who want to shout at me or correct me for the mess I wrote here are welcome, I promise to correct it based on your feedback)

See the documentation

(require '[promesa.core :as p] '[promesa.exec :as exec])

(def xs (vec (range 32)))

(defn api-call+
  [ex x]
  (p/then (p/promise x) api-call! ex))

(def ex (exec/fixed-pool 4))

(def p (p/all (map (partial api-call+ ex) xs)))

@p;; => [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]

Promesa Just Works and has light overhead, and would be my pick in most use cases.

Manifold

Manifold's deferred abstraction is pretty similar to Promesa's promises, with slightly higher overhead. It also offers a stream abstraction which isn't necessarily relevant for this discussion.

See the documentation

(require '[manifold.deferred :as d]
         '[manifold.executor :as e])

(def ex (e/fixed-thread-executor 4))

(defn manifold-api-call
  [ex x]
  (let [d (d/deferred ex)
        c (d/chain d #(future (api-call! %)))]
    (d/success! d x)
    c))

(def out (apply d/zip (mapv (partial manifold-api-call ex) (range 32))))

Summary

Similarities

Promesa and Manifold

both operate on deferred values, and have the option of lifting a sequence of deferred values into a single deferred value of the sequence.

Differences

Monads vs. Java

Promesa and Manifold are "up there" in a monadic world of effect types while the Java leaning implementations are more "just get things done" solutions. More mechanical.

core.async vs. everything else

The core.async solutions is the most mechanical and out of the common abstractions which already exist in Clojure (map, executors).

Retrospective

The Clojuriest

Of all available solutions the most Clojure-y ones are definitely tolitius/lasync and TheClimateCorporation/claypoole. Claypoole especially provides an almost drop-in replacement for your bad code.

This should probably be your go-to option when trying to clean up any misuses of pmap, doall, or any other idiosyncrasies.

The comfiest

I have to admit I like Promesa. The abstraction is comfortable and lends itself well to a code base which relies on asynchronous computation. It also plays well with funcool's other library, cats, which implements category theory concepts in Clojure.