With a Callable monad and Parallel Strategies in hand, we’re ready to construct some general-purpose parallel functions of a higher order.

Something we’ll want to do quite often in parallel programming is run a function over a whole list of values, in parallel. So often, in fact, that it would be very tedious and repetitive to write a loop every time we want to do this, sparking off threads for each function call, and collecting the results. Instead, what we’re going to develop next is a kind of parallel list functor. That is, a higher-order function that will turn any function into a function that operates on every element of a list simultaneously.

First, let’s get some terminology out of the way, since I’ve been accused of not defining my terms.

A **higher-order function** is simply a function that takes a function as an argument.

A **functor **is any type *T* for which there exists a higher-order function, call it *fmap*, that transforms a function of type *F<A,B>* into a function *F<T<A>,T<B>>*. This *fmap *function must also obey the laws of identity and composition such that the following expressions return *true *for all *x, p,* and *q:*

fmap(identity()).f(x) == x fmap(compose(p, q)).f(x) == fmap(p).f(fmap(q).f(x))

Here, *identity* is a first-class function of type *F<A,A>* that just returns its argument, and *compose *is function composition. Composition is defined to mean that *compose(p, q).f(x)* is the same as *p.f(q.f(x)).*

For example, Functional Java’s *List *class is a functor, because it comes equipped with a method *List<B> map(F<A,B> f)* which obeys the laws above. The implementation of *List’s* map method is trivial: It iterates over the list, calling the given function for each element, and returns the list of the results.

There’s a lot of information about functors and monads on the Internet, and I’ve written about them before, with examples in Java. Tony Morris also wrote a great post where he explains functors as “function application in an environment”. That’s a good way of looking at things.

I also want to make it clear, if you’re just now tuning in, that I’m using classes from Functional Java in the code examples, in addition to classes from the J2SE concurrency library. Basically only *Callable *and *Future* are from the J2SE library. *Strategy*, *List*, and the *Callables *utility class are from Functional Java. The code used here is from the latest trunk head revision of that library and may differ from the current official release.

Now, let’s get to the money.

**Mapping Over a List in Parallel**

We’re going to make a very similar kind of thing to a list functor, except it’s going to map a function over a list in parallel. Our parallel list transformation will take the form of a static method called *parMap. *It will take any existing function and turn it into a parallel function on lists. That is, if we already have written a function that takes an *Integer *and returns a *String*, we can pass that function to *parMap*, and it will return us a function that takes a list of *Integers *and returns a list of *Strings*. This means that nothing has to be rewritten or refactored to support the mapping of a function over a list in parallel. The *parMap *function will convert our existing code to parallel code, at runtime.

Here’s *parMap:*

public <B> F<List<B>, Callable<List<A>>> parMap(final F<B, A> f) { return new F<List<B>, Callable<List<A>>>() { public Callable<List<A>> f(final List<B> as) { return sequence(as.map(compose(par(), callable(f)))); } }; }

That’s a lot of power for so little code. Let’s read through what it does. It takes a function *f* and returns a new function that takes a list called *as*. This new function transforms *f * with *Callables.callable* to wrap its return type in a callable (or: *callable(f)* applies the Kleisli arrow for *Callables* to *f)*, and composes that with a function called *par()*. It then maps the resulting composition over the list *as*, which will result in a list of *Callables*, which we turn into a *Callable* of a *List* with the *sequence* function. The only new thing here is *par()*, which is a very simple instance method on *Strategy:*

public F<Callable<A>, Callable<A>> par() { return compose(Strategy.<A>obtain(), f()); }

So par() is just the Strategy’s function composed with the obtain() method that we created in Part 1. This turns the Future back into a Callable so that we can manipulate it in a lazy manner. This little guy is the meat of *parMap.* That is, *parMap* is basically just mapping *par()* over a list.

The result of *parMap(myFun).f(myList)*, then, is a *Callable* of a list that, when called, will give us the results of calling *myFun* on every element of *myList* in parallel. What’s more, it will work for any kind of parallelisation *Strategy*, and it will return immediately (remember, *Callable *is lazy), ready for us to make further calculations on the results even while those results are being calculated.

I think that’s pretty cool.

**Nested Parallelism
**

Sometimes the problem at hand is not quite so flat that it can be solved with just a map over a list. You might have a need for nested traversal of lists, where for every element in a list you traverse another list. We can model that behaviour with a higher-order function very similar to *parMap*, except that it takes a function that generates a list. This higher-order function is *parFlatMap:*

public static <A, B> Callable<List<B>> parFlatMap(final Strategy<List<B>> s, final F<A, List<B>> f, final List<A> as) { return fmap(List.<B>join_()).f(s.parMap(f).f(as)); }

In this definition, *fmap *is from the *Callable *monad, as described in Part 1. Notice that *parFlatMap *provides an example use of *parMap*, using it to turn the given function into a parallel function. The call to *parMap *with that function will actually yield a *Callable<List<List<B>>>* (can you see why?), so we use a final *join *lifted into the *Callable *environment with *fmap*, to turn the *List *of *Lists *into just a *List*.

The *parFlatMap *function is analogous to a parallel list monad, and it works very much like a parallel version of nested loops. For example, given a list of *Points pts*, a distance function *dist *that takes two *Points *and returns the distance between them, and a *Strategy<Double> s, *we can use *parFlatMap *to calculate the distance between every *Point *and every other, in parallel:

Callable<List<Double>> distances = parFlatMap(s, new F<Point, List<Double>>() { public List<Double> f(final Point p) { return pts.map(dist.f(p)); } }, pts);

The inner “loop” is represented by *List.map*, so we’re traversing the list serially, multiplying every element in parallel with each element in sequence. I.e. the outer loop is parallel and the inner loop is serial. It’s quite possible to replace *map *with *parMap*, and I’ll leave that as an exercise for the reader (hint: you will need both *fmap *and *join*). Note that the call to *parFlatMap *will return immediately, and the computation will be carried out in the background.

**Position-Wise Parallelism and Concurrent Folding**

Another function I want to talk about is the *parZipWith *function. By contrast to *parFlatMap*, it is like a parallel version of a single loop over two lists. Values from each list are taken at matching positions and the pairs are fed through the argument function all at the same time:

public static <A, B, C> F2<List<A>, List<B>, Callable<List<C>>> parZipWith(final Strategy<C> s, final F2<A, B, C> f) { return new F2<List<A>, List<B>, Callable<List<C>>>() { public Callable<List<C>> f(final List<A> as, final List<B> bs) { return sequence(as.zipWith(bs, compose(Callables.<B, C>arrow(), curry(f))).map(s.par())); } }; }

You will note that *parZipWith *simply uses *zipWith *from Functional Java’s *List *class. There’s some manouvering required to turn the argument function into the right form. The *curry *method turns the *F2<A,B,C>* into the equivalent *F<A,F<B,C>>* (sometimes called a Curried function). The *arrow *function is the first-class Kleisli arrow for *Callables*. That just means that composing it with *f* yields a version of *f* which returns a *Callable.*

Let’s take an example of using *parZipWith *to do useful work. If we had a list of *Points *representing a path, and we wanted to get the total length of the path, we could zip the list and its tail (every element except the first) with the *dist *function and take the sum of the results:

Double length = parZipWith(s, dist).f(pts, pts.tail()).call().foldLeft(curry(sum), 0);

For brevity’s sake, I’m pretending that there’s already a function called *sum*, which is just an *F2<Double, Double, Double>* that returns the sum of its two arguments. If *foldLeft *is something new to you, then have a look at one implementation of it. It’s a more general abstraction of a for-loop than *map()* is. Folding a list {1,2,3} with the *sum *function (and 0) can be thought of as replacing all the commas with plusses to yield [0+1+2+3]. We could have just as well iterated over the list, adding each value to a total, instead of using a *foldLeft *function. But we didn’t, for a reason that will become apparent presently.

There’s something about the above example that’s dissatisfying. We’re getting the distances in parallel, but then *call()* sticks out like a sore thumb. The computation will block until the list of distances is ready, then it will fold the list with the *sum *function. What’s worse, *call() *might throw an *Exception*. There’s a way we can avoid all that by traversing the list of distances while they’re being calculated. What we need to do is somehow lift the *foldLeft *function into the *Callable *environment. The call to *foldLeft *above turns a *List<Double>* into a *Double*. What we want is to avoid the call to *call(),* and turn our *Callable<List<Double>>* into a *Callable<Double>* directly.

This is really easy to do, and it’s a good thing we’re using a folding function rather than a for-loop. Here’s what we do:

Callable<List<Double>> edges = parZipWith(s, dist).f(pts, pts.tail()); Callable<Double> length = par(fmap(List.<B, B>foldLeft().f(curry(sum)).f(0)).f(edges));

There. No exceptions, and no waiting for results. This code will return immediately and perform the whole computation in the background. The part that calculates the edge lengths is the same as before, and we’re keeping that in a variable *edges *for clarity. The other line may require some explanation. We’re getting a first-class *foldLeft *function from the *List *class, partially applying it to yield a function that folds a *List<Double>* into a *Double.* Then we’re promoting it with *fmap *into the *Callable *monad. The promoted function then gets applied to *edges*, and finally the whole fold is evaluated concurrently by passing it to *par*.

Hopefully you can see what an immensely powerful tool for parallel programming functional style and higher-order functions are, even in Java. We were able to develop a couple of one-liners above that are terser, clearer, more maintainable, more re-usable (and more type-safe) than anything we could write in imperative object-oriented style using producer/consumer, factories, inversion of control, etc. There’s no secret sauce, no special syntax, no functional fairy dust. It’s just plain old Java with a handful of very useful interfaces.

In the third installment of this series, we will develop a tiny light-weight library for threadless Actors that can juggle millions of simultaneous computations in only a few threads. Stay close.

Note: The code herein is adapted from a Haskell library by Phil Trinder, Hans-Wolfgang Loidl, Kevin Hammond et al.