Now that even budget desktop and laptop computers are shipping with multi-core processors, it’s more important than ever to design programs so that they can take advantage of parallel processing. If you’re already writing software in Erlang or Parallel Haskell, then lucky you. But if you’re writing in Java (and are unable or unwilling to take up Scala), you will need to get organized.

Since Java 5, the JDK comes with a concurrency library that makes concurrent programming a lot easier than it used to be, and the library has been improved further in Java 6. This provides some basic abstractions that take care of the nitty-gritty of thread management for us. In this short series of articles, I’m going to show how we can build on top of that library to achieve concurrent programming of a higher order. We will employ some design patterns and simple abstract building blocks from functional programming. This style of writing concurrent programs will afford us the ability to:

- Compose ordinary functions into concurrent programs.
- Decouple parallel behavior from the algorithm itself.
- Turn existing code into parallel code, without refactoring, at runtime.
- Work with hypothetical results of concurrent computations, before they finish.

We will not use any locking or explicit synchronization. Basic familiarity with the java.util.concurrent package is assumed but not required. Familiarity with generics is highly recommended, and I recommend reading parts 1 and 2 of my Lazy Error Handling series as it explains some preliminaries. Most of the source code herein is already part of the Functional Java library, and I’ll use a fair bit of interfaces and classes from it, which I’ll introduce as we go along.

When we’re through, I will have demonstrated that a functional style of programming promotes code-reuse and modularity of parallel programs, above and beyond what can be achieved with the canonical object-orientated style.

**The Callable Monad**

The *Callable *interface is new in Java 5. It’s similar to the old *Runnable *interface that we all know and love, except that its *call() *method returns a result, and it may throw an *Exception*. If you followed my Lazy Error Handling articles, you will immediately notice that *Callable *is nearly identical to the *Thrower *interface described in that series, and you’ll know that it is indeed a potential monad. Treating *Callable *as a monad will allow us to work with it at a higher level of abstraction, letting us compose *Callables*, chain them, and project computations into them.

As you may already know, we need three ingredients (and only these three) to have a monad:

- Type construction. This comes free with the
*Callable*interface, since it takes a type parameter (i.e. it’s generic). - A unit function. This allows us to turn any value into a
*Callable*that returns that value again. - A binding function. This allows us to chain together
*Callables*with functions, without actually calling them.

Here is the unit function for *Callable* (we will refer to it as *unit* in the text to avoid ambiguity):

public static <A> Callable<A> callable(final A a) { return new Callable<A>() { public A call() { return a; } }; }

And here is its binding function:

public static <A, B> Callable<B> bind(final Callable<A> a, final F<A, Callable<B>> f) { return new Callable<B>() { public B call() throws Exception { return f.f(a.call()).call(); } }; }

Note: The F interface is from Functional Java. It represents a first-class function (a function that can be treated like any other value). *F<A, B>* has one method that takes an argument of type *A* and returns a value of type *B*.

Given the above two methods, Callable is now a monad. All monads are also functors, so we can define a *Callable *functor. What I mean by “functor” is that we can define a method to turn any existing function into a function on Callables, or, in other words, to apply any function to a value wrapped in a Callable, without actually calling it (i.e. lazily). This method is called *fmap*, and we can define it in terms of *bind*:

public static <A, B> F<Callable<A>, Callable<B>> fmap(final F<A, B> f) { return new F<Callable<A>, Callable<B>>() { public Callable<B> f(final Callable<A> a) { return bind(a, new F<A, Callable<B>>() { public Callable<B> f(final A ab) { return new Callable<B>() { public B call() { return f.f(ab); } }; } }); } }; }

Useful operations on *Callables*, that the methods above allow us to implement,* *include *sequence*—which is a method of turning a list of *Callables *into a single *Callable *that returns a list—and *join*, which peels one layer of *Callables* from a *Callable<Callable<A>>* so that it becomes just *Callable<A>*. You will find the source code for those as part of Functional Java.

**Parallel Strategies**

When working with the *java.util.concurrent *library, you normally don’t work with *Threads *directly. You might instead implement *Callable<A>* and submit instances of your implementation to an *ExecutorService*, which yields values of type *Future<A>* which represents a running computation that yields an *A*. *Future* has a method called *get()* that returns the result as soon as it’s ready. This is a great improvement over managing threads and Runnables ourselves, but it still ties our hands somewhat to the *ExecutorService*, and encourages tight coupling of our code to the parallelisation library.

Inspired by Haskell’s “Parallel Strategies”, let’s instead work with parallelisation in the abstract. For what is *ExecutorService*, really? It’s a method of turning *Callables *into *Futures*. That means it’s a kind of function, so we can abstract from it, using a function type: *F<Callable<A>, Future<A>>*. Such a function can then be backed by an *ExecutorService*, or by something else entirely, such as a load-balancing facility that serializes *Callables *to be executed on remote servers.

We will use a new class, *Strategy<A>*, that allows us to effectively separate parallelism from the algorithm itself:

public final class Strategy<A> { private F<Callable<A>, Future<A>> f; private Strategy(final F<Callable<A>, Future<A>> f) { this.f = f; } public F<Callable<A>, Future<A>> f() { return f; } public static <A> Strategy<A> strategy(final F<Callable<A>, Future<A>> f) { return new Strategy<A>(f); } }

We’ll add a couple of static functions to create simple strategies:

public static <A> Strategy<A> simpleThreadStrategy() { return strategy(new F<Callable<A>, Future<A>>() { public Future<A> f(final Callable<A> p) { final FutureTask<A> t = new FutureTask<A>(p); new Thread(t).start(); return t; } }); } public static <A> Strategy<A> executorStrategy(final ExecutorService s) { return strategy(new F<Callable<A>, Future<A>>() { public Future<A> f(final Callable<A> p) { return s.submit(p); } }); }

One of the neat things that working with *Strategies *as functions allows us to do is use the *Callable *monad to compose them with existing functions. Any function can be lifted into the *Callable *monad using *fmap*, and then composed with a *Strategy *to yield a concurrent function. Moreover, we can use *Strategies* to convert existing functions to concurrent functions. The following method on *Strategy* will take any function and return the equivalent function that executes concurrently. Calling such a function will give you a Future value from which you can get the computed result whenever it’s ready.

public <B> F<B, Future<A>> lift(final F<B, A> f) { final Strategy<A> self = this; return new F<B, Future<A>>() { public Future<A> f(final B b) { return self.f().f(new Callable<A>() { public A call() { return f.f(b); } }); } }; }

Alert readers will note that *lift* represents the Kleisli arrow for the *Future *monad. As for most monads, this arrow is a very useful kind of thing (see Lazy Error Handling, Part 2). In the *Future *monad, the Kleisli arrow provides parallel function application. If you already have a function *f,* then calling *lift(f).f(x)* will apply that function to *x* while simultaneously continuing with the next statement in the current program.

**Lazy Futures**

For functions involving *Futures*, we generally always want to have the Future type in the codomain (on the right-hand side, in the return type). If you think about it, you’ll see that functions that take *Futures *in their arguments won’t be able to do much without blocking on *Future.get()*. However, that doesn’t mean we can’t compose *Future*-valued functions together. It just means that we can only compose two of them together in such a way that we either wait for the first *Future *to obtain a value before firing off the second one, or we have to spark a new thread that does nothing but wait on the first *Future*. We’re much better off composing *Callables* and then turning those into *Futures *as needed. In fact, we might want to wrap values of type *Future<A>* inside of a *Callable<A>* again so that we can manipulate their return values **while they are running**:

public static <A> Callable<A> obtain(final Future<A> x) { return new Callable<A>() { public A call() throws Exception { return x.get(); } }; }

And this takes us full circle back into the Callable monad, where we can compose computations, bind them to functions and map functions over them, all lazily. Which means: without actually asking what their values are until we absolutely need to know.

So we have accomplished what we set out to do in this first part of the series. We’ve written a *Callable *monad that lets us compose existing code into concurrent programs. We’ve implemented parallel strategies to run those programs in parallel, without regard to the actual parallelisation implementation. We have a Kleisli arrow that lets us run ordinary functions concurrently with each other, and finally we have a way of taking concurrently running computations back into the Callable monad. Not bad for a day’s work.

In the next part of this series, we will employ what we’ve built here to develop some higher-order parallel functions, including a parallel list functor. I hope you join me.

Interesting seeing classic ‘algorithm + strategy = parallelism’ stuff outside of haskell.

I’m curious, why do you declare a static factory method on Strategy? Why not just make the constructor public?

Daniel: An excellent question.

1. To improve encapsulation. It’s an implementation detail whether and how a Strategy is actually created for a given function. Note that Strategies are completely immutable, so it’s not strictly necessary to create a new instance for the same function twice.

2. To make the construction of Strategies more uniform, since you always call a static method on Strategy if you want one. This also makes Javadocs more intuitive to read.

3. To make the construction of strategies slightly less verbose, when possible. Compare these two:

Strategy<SomeLongClassName> s = new Strategy<SomeLongClassName>(myFun);

vs.

Strategy<SomeLongClassName> s = strategy(myFun);

It’s my preference to make all explicitly declared constructors private for the reasons above.

Pingback: λ Tony’s blog λ » Blog Archive » Actor concurrency for Java

Pingback: λ Tony’s blog λ » Blog Archive » Just what the funk is a Functor anyway?

Pingback: Higher-Order Java Parallelism, Part 4: A Better Future « Apocalisp

Pingback: Learning Java functional programming « Faustinelli's Blog

Pingback: The goal and the game rules « Faustinelli's Blog

Pingback: Futures « Marco Faustinelli's Blog