Higher-Order Java Parallelism, Part 4: A Better Future

This is the fourth installment in a series of posts about making highly concurrent software easier to write in Java. Previous entries are available here: part 1, part 2, part 3. However, I aim to make it possible to follow along even if you haven’t read the previous posts.

I Have Seen the Future…

If you have used the Java 5 concurrency API at all, you will have come across the Future class. For example, when you submit a Callable<Integer> to an ExecutorService, what you get back is a Future<Integer> which represents a computation, running concurrently, that will (hopefully) result in an integer at some time in the future. Once you have the Future<Integer> fi, you can later get the integer out of it by calling fi.get().

That’s all fine and dandy, but let’s say you want do do something like add two future integers. You could do something like this:

int sum = x.get() + y.get();

This will block the current thread until both of those integers are available, then add them together. But why wait for that? If you have an ExecutorService, you can create a new Future that computes the sum:


Future<Integer> sum = executorService.submit(new Callable<Integer>() {
  public Integer call() {
    return x.get() + y.get();
  }
});

Now the current thread can continue, but we’ve started a new thread that does nothing until the values of x and y have both been calculated by yet another thread.

We’re beginning to see a problem here. We want to be able to compose Futures together to form new Futures, but find that the number of threads required to compose n Future values is on the order of O(n). If we have a fixed-size thread pool, we’ll run into starvation. If we have an unbounded thread pool, then we might start more threads than the operating system can handle, most of which will be doing nothing at all but wait for other threads.

This should all sound very familiar. Threads are a space resource. What kind of processes are O(n) in their space requirement? If you said “linearly recursive processes”, go to the head of the class. Intuitively, for the same reason that we can find iterative versions of any recursive algorithm, it seems that we should be able to find an algorithm to accomplish the same thing with O(1) threads.

…and it is a Monad

In the above example, it’s like we’re giving seperate instructions, waiting for the results of each in between. Imagine if we were working in an office with Bob and Alice, and we needed work on something from both of them. We might go to Bob and say: “Bob, process this and give me the result”. Then we’d take the result to Alice and say: “Alice, here’s a result from Bob.” It would be much better, if we could just go to Bob and say: “Bob, process this and give the result to Alice.” This is the essential difference between recursive and iterative processes.

But wait! We say that kind of thing all the time, in Java:


public Work bob(Work w) { ... }
public Work alice(Work w) { ... }

public Work bobThenAlice(Work w) {
  Work b = bob(w);
  return alice(b);
}

Here, we’re instructing a single thread to do some work, then use the result of that work to do more work. What’s really sneaky here is the meaning of the semicolon. In this context, what the former semicolon means is “take the stored value b from the previous statement and bind it to the free variable b in the next statement”. You can think of the second semicolon as binding a blank statement over the result of the preceding statement.

Using first-class functions from Functional Java, and using the Callables monad from the first part of this series, you could implement that same behaviour using something like this:

F<Work, Callable<Work>> bob = new F<Work, Callable<Work>>() {
  public Callable<Work> f(final Work w) {
    return new Callable<Work>() {
      public Work call() { ... }
    };
  }
};
F<Work, Callable<Work>> alice = new F<Work, Callable<Work>>() { ... };

public Callable<Work> bobThenAlice(Work w) {
  return Callables.bind(bob.f(w), alice);
}

That’s pretty neat. Now we have a single Callable that we can run concurrently in a new thread, turning it into a Future. But wouldn’t it be cool if we could bind Futures? That would let us take already running computations and combine them in exactly this way. We want a Future monad.

The problem with combining Futures is in the nature of the future. This is a deliberate pun on “future”. Think about time for a second. What does it mean to get a value that’s in the future? By the very fact that causality is sequential, it’s a violation of the nature of reality to have something that doesn’t yet exist. It’s the future; you’re not supposed to get stuff out. But, we can put stuff in, can’t we? Yes we can. You know those corny time-capsule things where people put their mountain bikes and Nintendo games for future generations to enjoy later? We can do that with data values. And not just values, but computations.

Here’s One I Made Earlier

The Future class in the standard Java libraries doesn’t come with any methods for projecting computations into the future. But Functional Java comes with a class called Promise<A> which does have that feature. It makes use of light-weight concurrent processes (actors), and parallel strategies, as described in the previous post, to implement the ability to combine concurrent computations into larger (concurrently executing) structures.

Since it is implemented as a monad, the methods it provides are all the usual suspects: unit, bind, fmap, join, etc. Here’s a quick overview of what they do and why they’re useful. Grasping them doesn’t just help you understand the Promise class, but any monad you may come across in the (ahem) future.

The unit function, the constructor of Promises, is just called promise. It has a few overloaded forms, but here is the simplest one.

public static <A> Promise<A> promise(Strategy<A> s, P1<A> p);

The P1 class is just a simple closure with no arguments, provided by the Functional Java library. P1<A> consists of one abstract method: A _1(). Strategy represents a method of evaluating P1s concurrently. I also talk about Strategies in the previous post, but the long and the short of it is that it has methods to evaluate the P1 value according to some parallelisation strategy, like with a thread pool for instance.

Calling the promise method starts a concurrent computation, in a manner according to the given strategy, that evaluates p. The resulting Promise value is a handle on the running computation, and can be used to retrieve the value later. Promise.claim() will block the current thread until the value is available, exactly like Future.get(), but this is generally not what you want to do. Instead, you want to bind.

The essence of the monad pattern is the binding function. If you don’t think you already know what a monad is, but understand this method, then you know more than you think:

public Promise<B> bind(F<A, Promise<B>> f);

This method means that if you have a Promise of an A, and a function from an A to a Promise of a B, you can get a Promise of a B. I.e. if somebody promises you an A, and I can promise you a B for every A, it’s the same thing as being promised a B in the first place.

The mapping function:

public Promise<B> fmap(F<A, B> f);

This method means that if you have an Promise of an A, and a function from A to B, you can get a Promise of a B. In other words, you can map any function over a Promise, and fmap will return you a Promise of the result. Behind the scenes, fmap is implemented by calling the bind and promise methods. The difference between this method and the bind method is subtle but important. Calling p.bind(f) is exactly equivalent to calling Promise.join(p.fmap(f)).

The join function:

public static <A> Promise<A> join(Promise<Promise<A>> a);

Join is a lot more useful than it looks. If you have a promised Promise, it’s the same as just having a Promise. In practise, that means that if you can start a concurrent task that starts a concurrent task, you can combine those into one concurrent task. You can think of it as the semantic equivalent of Thread.join(), except that our method returns the joined Promise immediately.

Coming back to Bob and Alice for a second, we can implement bob and alice from the Callables example above, using Promise instead of Callable . Both bob and alice will construct Promises using the promise method, putting whatever work they do inside a P1. That way, when you call bob, he’s already doing his work by the time you mention Alice’s name:

final Strategy<Work> s = Strategy.simpleThreadStrategy();
F<Work, Promise<Work>> bob = new F<Work, Promise<Work>>() {
  public Promise<Work> f(final Work w) {
    return promise(s, new P1() {
      public Work _1() { ... }
    });
  }
};
F<Work, Promise<Work>> alice = new F<Work, Promise<Work>>() { ... };

public Promise<Work> bobThenAlice(Work w) {
  return bob.f(w).bind(alice);
}

So now that we can build arbitrarily complex concurrent processes from already-running processes, how do we get the final promised value out? Again, you could call Promise.claim(), but that blocks the current thread as we know. Instead, Promise comes equipped with a method to(Actor<A>) which promises to send the value to the given Actor as soon as it’s ready. Control is returned to the current thread immediately, and the whole computation continues in the background, including the action to take on the final result. Actors were discussed in the previous post.

A Fully Functional Example

I think an example is in order. The following program calculates Fibonacci numbers using a naive recursive algorithm. This is an algorithm that benefits particularly well from parallelisation (barring any other kind of optimisation). If we were just using plain old Future instead of Promise, the number of Threads required to calculate the nth Fibonacci number is O(fib(n)). But since we’re using Promise, we can use a fixed number of actual Java threads.

package concurrent;

import static fj.Bottom.error;
import fj.Effect;
import fj.F;
import fj.F2;
import fj.Function;
import fj.P;
import fj.P1;
import fj.P2;
import fj.Unit;
import fj.data.List;
import fj.control.parallel.Actor;
import fj.control.parallel.Promise;
import fj.control.parallel.Strategy;
import static fj.data.List.range;
import static fj.function.Integers.add;
import static fj.control.parallel.Promise.join;
import static fj.control.parallel.Promise.promise;
import static fj.control.parallel.Actor.actor;

import java.text.MessageFormat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Fibs {

private static final int CUTOFF = 35;

public static void main(final String[] args) throws Exception {
if (args.length < 1) throw error("This program takes an argument: number_of_threads"); final int threads = Integer.parseInt(args[0]); final ExecutorService pool = Executors.newFixedThreadPool(threads); final Strategy su = Strategy.executorStrategy(pool);
final Strategy> spi = Strategy.executorStrategy(pool);

// This actor performs output and detects the termination condition.
final Actor> out = actor(su, new Effect>() {
public void e(final List fs) {
for (P2 p : fs.zipIndex()) {
System.out.println(MessageFormat.format(“n={0} => {1}”, p._2(), p._1()));
}
pool.shutdown();
}
});

// A parallel recursive Fibonacci function
final F> fib = new F>() {
public Promise f(final Integer n) {
return n < CUTOFF ? promise(su, P.p(seqFib(n))) : f(n - 1).bind(f(n - 2), add); } }; System.out.println("Calculating Fibonacci sequence in parallel..."); join(su, spi.parMap(fib, range(0, 46)).map(Promise.sequence(su))).to(out);
}

// The sequential version of the recursive Fibonacci function
public static int seqFib(final int n) {
return n < 2 ? n : seqFib(n - 1) + seqFib(n - 2); } } [/sourcecode] For all you Scala fans out there, the Functional Java library comes with convenient bindings for Scala as well. Here’s the same thing written in Scala. Note that this does not use the Actor library from the standard Scala libraries, but the same lighter weight Java implementation that the Java example above uses.

package concurrent

import fj.control.parallel.{Actor, Promise}
import fj.Function.curry
import fj.control.parallel.Strategy.executorStrategy
import fjs.control.parallel.Strategy.parMap
import fjs.control.parallel.Promise._
import fjs.control.parallel.Actor._
import Integer.parseInt
import List.range
import java.util.concurrent.Executors.newFixedThreadPool
import fjs.F._
import fjs.F2._
import fjs.P1._
import fjs.P2._
import fjs.data.List._
import fjs.control.parallel.Strategy.ListPar

object Fibs {
val CUTOFF = 35;

def main(args: Array[String]) = {
if (args.length < 1) error("This program takes an argument: number_of_threads") val threads = parseInt(args(0)) val pool = newFixedThreadPool(threads) implicit def s[A] = executorStrategy[A](pool) // This actor performs output and detects the termination condition. val out: Actor[List[Int]] = actor{ ns =>
for ((n, i) <- ns.zipWithIndex) printf("n=%d => %d\n”, i, n)
pool.shutdown()
}

// A parallel recursive Fibonacci function
def fib(n: Int): Promise[Int] = {
if (n < CUTOFF) promise(() => seqFib(n))
else fib(n – 1).bind(fib(n – 2), curry((_: Int) + (_: Int)))
}

println(“Calculating Fibonacci sequence in parallel…”)
out ! sequence(parMap[Int, Promise[Int], List](fib, range(0, 46)));
}

// The sequential version of the recursive Fibonacci function
def seqFib(n: Int): Int = if (n < 2) n else seqFib(n - 1) + seqFib(n - 2); } [/sourcecode] Here's an example run of this program using a pool of 10 threads. It runs about 7 times faster that way than with just 1 thread on my 8-way machine. The Scala version is also very slightly faster for some reason.

$ scala -classpath .:../../../build/classes/src concurrent.Fibs 10
Calculating Fibonacci sequence in parallel…
n=0 => 0
n=1 => 1
n=2 => 1
n=3 => 2
n=4 => 3
n=5 => 5
n=6 => 8
n=7 => 13
n=8 => 21
n=9 => 34
n=10 => 55
n=11 => 89
n=12 => 144
n=13 => 233
n=14 => 377
n=15 => 610
n=16 => 987
n=17 => 1597
n=18 => 2584
n=19 => 4181
n=20 => 6765
n=21 => 10946
n=22 => 17711
n=23 => 28657
n=24 => 46368
n=25 => 75025
n=26 => 121393
n=27 => 196418
n=28 => 317811
n=29 => 514229
n=30 => 832040
n=31 => 1346269
n=32 => 2178309
n=33 => 3524578
n=34 => 5702887
n=35 => 9227465
n=36 => 14930352
n=37 => 24157817
n=38 => 39088169
n=39 => 63245986
n=40 => 102334155
n=41 => 165580141
n=42 => 267914296
n=43 => 433494437
n=44 => 701408733
n=45 => 1134903170

Massive win! If we had been using Future instead of Promise, we would have needed at least 55 threads (since we’re using a cutoff at 35 and 45 – 35 = 10 and fib(10) = 55). Heck, we could even remove the threshold value altogether and calculate all 45 parallel fibs, in parallel. That would require 1,134,903,170 threads in the absence of non-blocking concurrency abstractions like Promise and Actor. We can run that in just one thread if we’d like.

Advertisements

Higher-Order Java Parallelism, Part 1: Parallel Strategies and the Callable Monad

Now that even budget desktop and laptop computers are shipping with multi-core processors, it’s more important than ever to design programs so that they can take advantage of parallel processing. If you’re already writing software in Erlang or Parallel Haskell, then lucky you. But if you’re writing in Java (and are unable or unwilling to take up Scala), you will need to get organized.

Since Java 5, the JDK comes with a concurrency library that makes concurrent programming a lot easier than it used to be, and the library has been improved further in Java 6. This provides some basic abstractions that take care of the nitty-gritty of thread management for us. In this short series of articles, I’m going to show how we can build on top of that library to achieve concurrent programming of a higher order. We will employ some design patterns and simple abstract building blocks from functional programming. This style of writing concurrent programs will afford us the ability to:

  • Compose ordinary functions into concurrent programs.
  • Decouple parallel behavior from the algorithm itself.
  • Turn existing code into parallel code, without refactoring, at runtime.
  • Work with hypothetical results of concurrent computations, before they finish.

We will not use any locking or explicit synchronization. Basic familiarity with the java.util.concurrent package is assumed but not required. Familiarity with generics is highly recommended, and I recommend reading parts 1 and 2 of my Lazy Error Handling series as it explains some preliminaries. Most of the source code herein is already part of the Functional Java library, and I’ll use a fair bit of interfaces and classes from it, which I’ll introduce as we go along.

When we’re through, I will have demonstrated that a functional style of programming promotes code-reuse and modularity of parallel programs, above and beyond what can be achieved with the canonical object-orientated style.

The Callable Monad

The Callable interface is new in Java 5. It’s similar to the old Runnable interface that we all know and love, except that its call() method returns a result, and it may throw an Exception. If you followed my Lazy Error Handling articles, you will immediately notice that Callable is nearly identical to the Thrower interface described in that series, and you’ll know that it is indeed a potential monad. Treating Callable as a monad will allow us to work with it at a higher level of abstraction, letting us compose Callables, chain them, and project computations into them.

As you may already know, we need three ingredients (and only these three) to have a monad:

  1. Type construction. This comes free with the Callable interface, since it takes a type parameter (i.e. it’s generic).
  2. A unit function. This allows us to turn any value into a Callable that returns that value again.
  3. A binding function. This allows us to chain together Callables with functions, without actually calling them.

Here is the unit function for Callable (we will refer to it as unit in the text to avoid ambiguity):

  public static <A> Callable<A> callable(final A a) {
    return new Callable<A>() {
      public A call() {
        return a;
      }
    };
  }

And here is its binding function:

  public static <A, B> Callable<B> bind(final Callable<A> a, final F<A, Callable<B>> f) {
    return new Callable<B>() {
      public B call() throws Exception {
        return f.f(a.call()).call();
      }
    };
  }

Note: The F interface is from Functional Java. It represents a first-class function (a function that can be treated like any other value). F<A, B> has one method that takes an argument of type A and returns a value of type B.

Given the above two methods, Callable is now a monad. All monads are also functors, so we can define a Callable functor. What I mean by “functor” is that we can define a method to turn any existing function into a function on Callables, or, in other words, to apply any function to a value wrapped in a Callable, without actually calling it (i.e. lazily). This method is called fmap, and we can define it in terms of bind:

  public static <A, B> F<Callable<A>, Callable<B>> fmap(final F<A, B> f) {
    return new F<Callable<A>, Callable<B>>() {
      public Callable<B> f(final Callable<A> a) {
        return bind(a, new F<A, Callable<B>>() {
          public Callable<B> f(final A ab) {
            return new Callable<B>() {
              public B call() {
                return f.f(ab);
              }
            };
          }
        });
      }
    };
  }

Useful operations on Callables, that the methods above allow us to implement, include sequence—which is a method of turning a list of Callables into a single Callable that returns a list—and join, which peels one layer of Callables from a Callable<Callable<A>> so that it becomes just Callable<A>. You will find the source code for those as part of Functional Java.

Parallel Strategies

When working with the java.util.concurrent library, you normally don’t work with Threads directly. You might instead implement Callable<A> and submit instances of your implementation to an ExecutorService, which yields values of type Future<A> which represents a running computation that yields an A. Future has a method called get() that returns the result as soon as it’s ready. This is a great improvement over managing threads and Runnables ourselves, but it still ties our hands somewhat to the ExecutorService, and encourages tight coupling of our code to the parallelisation library.

Inspired by Haskell’s “Parallel Strategies”, let’s instead work with parallelisation in the abstract. For what is ExecutorService, really? It’s a method of turning Callables into Futures. That means it’s a kind of function, so we can abstract from it, using a function type: F<Callable<A>, Future<A>>. Such a function can then be backed by an ExecutorService, or by something else entirely, such as a load-balancing facility that serializes Callables to be executed on remote servers.

We will use a new class, Strategy<A>, that allows us to effectively separate parallelism from the algorithm itself:

  public final class Strategy<A> {

    private F<Callable<A>, Future<A>> f;

    private Strategy(final F<Callable<A>, Future<A>> f) {
      this.f = f;
    }

    public F<Callable<A>, Future<A>> f() {
      return f;
    }

    public static <A> Strategy<A> strategy(final F<Callable<A>, Future<A>> f) {
      return new Strategy<A>(f);
    }

  }

We’ll add a couple of static functions to create simple strategies:

  public static <A> Strategy<A> simpleThreadStrategy() {
    return strategy(new F<Callable<A>, Future<A>>() {
      public Future<A> f(final Callable<A> p) {
        final FutureTask<A> t = new FutureTask<A>(p);
        new Thread(t).start();
        return t;
      }
    });
  }

  public static <A> Strategy<A> executorStrategy(final ExecutorService s) {
    return strategy(new F<Callable<A>, Future<A>>() {
      public Future<A> f(final Callable<A> p) {
        return s.submit(p);
      }
    });
  }

One of the neat things that working with Strategies as functions allows us to do is use the Callable monad to compose them with existing functions. Any function can be lifted into the Callable monad using fmap, and then composed with a Strategy to yield a concurrent function. Moreover, we can use Strategies to convert existing functions to concurrent functions. The following method on Strategy will take any function and return the equivalent function that executes concurrently. Calling such a function will give you a Future value from which you can get the computed result whenever it’s ready.

  public <B> F<B, Future<A>> lift(final F<B, A> f) {
    final Strategy<A> self = this;
    return new F<B, Future<A>>() {
      public Future<A> f(final B b) {
        return self.f().f(new Callable<A>() {
          public A call() {
            return f.f(b);
          }
        });
      }
    };
  }

Alert readers will note that lift represents the Kleisli arrow for the Future monad. As for most monads, this arrow is a very useful kind of thing (see Lazy Error Handling, Part 2). In the Future monad, the Kleisli arrow provides parallel function application. If you already have a function f, then calling lift(f).f(x) will apply that function to x while simultaneously continuing with the next statement in the current program.

Lazy Futures

For functions involving Futures, we generally always want to have the Future type in the codomain (on the right-hand side, in the return type). If you think about it, you’ll see that functions that take Futures in their arguments won’t be able to do much without blocking on Future.get(). However, that doesn’t mean we can’t compose Future-valued functions together. It just means that we can only compose two of them together in such a way that we either wait for the first Future to obtain a value before firing off the second one, or we have to spark a new thread that does nothing but wait on the first Future. We’re much better off composing Callables and then turning those into Futures as needed. In fact, we might want to wrap values of type Future<A> inside of a Callable<A> again so that we can manipulate their return values while they are running:

  public static <A> Callable<A> obtain(final Future<A> x) {
    return new Callable<A>() {
      public A call() throws Exception {
        return x.get();
      }
    };
  }

And this takes us full circle back into the Callable monad, where we can compose computations, bind them to functions and map functions over them, all lazily. Which means: without actually asking what their values are until we absolutely need to know.

So we have accomplished what we set out to do in this first part of the series. We’ve written a Callable monad that lets us compose existing code into concurrent programs. We’ve implemented parallel strategies to run those programs in parallel, without regard to the actual parallelisation implementation. We have a Kleisli arrow that lets us run ordinary functions concurrently with each other, and finally we have a way of taking concurrently running computations back into the Callable monad. Not bad for a day’s work.

In the next part of this series, we will employ what we’ve built here to develop some higher-order parallel functions, including a parallel list functor. I hope you join me.