Higher-Order Java Parallelism, Part 4: A Better Future

This is the fourth installment in a series of posts about making highly concurrent software easier to write in Java. Previous entries are available here: part 1, part 2, part 3. However, I aim to make it possible to follow along even if you haven’t read the previous posts.

I Have Seen the Future…

If you have used the Java 5 concurrency API at all, you will have come across the Future class. For example, when you submit a Callable<Integer> to an ExecutorService, what you get back is a Future<Integer> which represents a computation, running concurrently, that will (hopefully) result in an integer at some time in the future. Once you have the Future<Integer> fi, you can later get the integer out of it by calling fi.get().

That’s all fine and dandy, but let’s say you want do do something like add two future integers. You could do something like this:

int sum = x.get() + y.get();

This will block the current thread until both of those integers are available, then add them together. But why wait for that? If you have an ExecutorService, you can create a new Future that computes the sum:


Future<Integer> sum = executorService.submit(new Callable<Integer>() {
  public Integer call() {
    return x.get() + y.get();
  }
});

Now the current thread can continue, but we’ve started a new thread that does nothing until the values of x and y have both been calculated by yet another thread.

We’re beginning to see a problem here. We want to be able to compose Futures together to form new Futures, but find that the number of threads required to compose n Future values is on the order of O(n). If we have a fixed-size thread pool, we’ll run into starvation. If we have an unbounded thread pool, then we might start more threads than the operating system can handle, most of which will be doing nothing at all but wait for other threads.

This should all sound very familiar. Threads are a space resource. What kind of processes are O(n) in their space requirement? If you said “linearly recursive processes”, go to the head of the class. Intuitively, for the same reason that we can find iterative versions of any recursive algorithm, it seems that we should be able to find an algorithm to accomplish the same thing with O(1) threads.

…and it is a Monad

In the above example, it’s like we’re giving seperate instructions, waiting for the results of each in between. Imagine if we were working in an office with Bob and Alice, and we needed work on something from both of them. We might go to Bob and say: “Bob, process this and give me the result”. Then we’d take the result to Alice and say: “Alice, here’s a result from Bob.” It would be much better, if we could just go to Bob and say: “Bob, process this and give the result to Alice.” This is the essential difference between recursive and iterative processes.

But wait! We say that kind of thing all the time, in Java:


public Work bob(Work w) { ... }
public Work alice(Work w) { ... }

public Work bobThenAlice(Work w) {
  Work b = bob(w);
  return alice(b);
}

Here, we’re instructing a single thread to do some work, then use the result of that work to do more work. What’s really sneaky here is the meaning of the semicolon. In this context, what the former semicolon means is “take the stored value b from the previous statement and bind it to the free variable b in the next statement”. You can think of the second semicolon as binding a blank statement over the result of the preceding statement.

Using first-class functions from Functional Java, and using the Callables monad from the first part of this series, you could implement that same behaviour using something like this:

F<Work, Callable<Work>> bob = new F<Work, Callable<Work>>() {
  public Callable<Work> f(final Work w) {
    return new Callable<Work>() {
      public Work call() { ... }
    };
  }
};
F<Work, Callable<Work>> alice = new F<Work, Callable<Work>>() { ... };

public Callable<Work> bobThenAlice(Work w) {
  return Callables.bind(bob.f(w), alice);
}

That’s pretty neat. Now we have a single Callable that we can run concurrently in a new thread, turning it into a Future. But wouldn’t it be cool if we could bind Futures? That would let us take already running computations and combine them in exactly this way. We want a Future monad.

The problem with combining Futures is in the nature of the future. This is a deliberate pun on “future”. Think about time for a second. What does it mean to get a value that’s in the future? By the very fact that causality is sequential, it’s a violation of the nature of reality to have something that doesn’t yet exist. It’s the future; you’re not supposed to get stuff out. But, we can put stuff in, can’t we? Yes we can. You know those corny time-capsule things where people put their mountain bikes and Nintendo games for future generations to enjoy later? We can do that with data values. And not just values, but computations.

Here’s One I Made Earlier

The Future class in the standard Java libraries doesn’t come with any methods for projecting computations into the future. But Functional Java comes with a class called Promise<A> which does have that feature. It makes use of light-weight concurrent processes (actors), and parallel strategies, as described in the previous post, to implement the ability to combine concurrent computations into larger (concurrently executing) structures.

Since it is implemented as a monad, the methods it provides are all the usual suspects: unit, bind, fmap, join, etc. Here’s a quick overview of what they do and why they’re useful. Grasping them doesn’t just help you understand the Promise class, but any monad you may come across in the (ahem) future.

The unit function, the constructor of Promises, is just called promise. It has a few overloaded forms, but here is the simplest one.

public static <A> Promise<A> promise(Strategy<A> s, P1<A> p);

The P1 class is just a simple closure with no arguments, provided by the Functional Java library. P1<A> consists of one abstract method: A _1(). Strategy represents a method of evaluating P1s concurrently. I also talk about Strategies in the previous post, but the long and the short of it is that it has methods to evaluate the P1 value according to some parallelisation strategy, like with a thread pool for instance.

Calling the promise method starts a concurrent computation, in a manner according to the given strategy, that evaluates p. The resulting Promise value is a handle on the running computation, and can be used to retrieve the value later. Promise.claim() will block the current thread until the value is available, exactly like Future.get(), but this is generally not what you want to do. Instead, you want to bind.

The essence of the monad pattern is the binding function. If you don’t think you already know what a monad is, but understand this method, then you know more than you think:

public Promise<B> bind(F<A, Promise<B>> f);

This method means that if you have a Promise of an A, and a function from an A to a Promise of a B, you can get a Promise of a B. I.e. if somebody promises you an A, and I can promise you a B for every A, it’s the same thing as being promised a B in the first place.

The mapping function:

public Promise<B> fmap(F<A, B> f);

This method means that if you have an Promise of an A, and a function from A to B, you can get a Promise of a B. In other words, you can map any function over a Promise, and fmap will return you a Promise of the result. Behind the scenes, fmap is implemented by calling the bind and promise methods. The difference between this method and the bind method is subtle but important. Calling p.bind(f) is exactly equivalent to calling Promise.join(p.fmap(f)).

The join function:

public static <A> Promise<A> join(Promise<Promise<A>> a);

Join is a lot more useful than it looks. If you have a promised Promise, it’s the same as just having a Promise. In practise, that means that if you can start a concurrent task that starts a concurrent task, you can combine those into one concurrent task. You can think of it as the semantic equivalent of Thread.join(), except that our method returns the joined Promise immediately.

Coming back to Bob and Alice for a second, we can implement bob and alice from the Callables example above, using Promise instead of Callable . Both bob and alice will construct Promises using the promise method, putting whatever work they do inside a P1. That way, when you call bob, he’s already doing his work by the time you mention Alice’s name:

final Strategy<Work> s = Strategy.simpleThreadStrategy();
F<Work, Promise<Work>> bob = new F<Work, Promise<Work>>() {
  public Promise<Work> f(final Work w) {
    return promise(s, new P1() {
      public Work _1() { ... }
    });
  }
};
F<Work, Promise<Work>> alice = new F<Work, Promise<Work>>() { ... };

public Promise<Work> bobThenAlice(Work w) {
  return bob.f(w).bind(alice);
}

So now that we can build arbitrarily complex concurrent processes from already-running processes, how do we get the final promised value out? Again, you could call Promise.claim(), but that blocks the current thread as we know. Instead, Promise comes equipped with a method to(Actor<A>) which promises to send the value to the given Actor as soon as it’s ready. Control is returned to the current thread immediately, and the whole computation continues in the background, including the action to take on the final result. Actors were discussed in the previous post.

A Fully Functional Example

I think an example is in order. The following program calculates Fibonacci numbers using a naive recursive algorithm. This is an algorithm that benefits particularly well from parallelisation (barring any other kind of optimisation). If we were just using plain old Future instead of Promise, the number of Threads required to calculate the nth Fibonacci number is O(fib(n)). But since we’re using Promise, we can use a fixed number of actual Java threads.


package concurrent;

import static fj.Bottom.error;
import fj.Effect;
import fj.F;
import fj.F2;
import fj.Function;
import fj.P;
import fj.P1;
import fj.P2;
import fj.Unit;
import fj.data.List;
import fj.control.parallel.Actor;
import fj.control.parallel.Promise;
import fj.control.parallel.Strategy;
import static fj.data.List.range;
import static fj.function.Integers.add;
import static fj.control.parallel.Promise.join;
import static fj.control.parallel.Promise.promise;
import static fj.control.parallel.Actor.actor;

import java.text.MessageFormat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Fibs {

  private static final int CUTOFF = 35;

  public static void main(final String[] args) throws Exception {
    if (args.length < 1)
      throw error("This program takes an argument: number_of_threads");

    final int threads = Integer.parseInt(args[0]);
    final ExecutorService pool = Executors.newFixedThreadPool(threads);
    final Strategy<Unit> su = Strategy.executorStrategy(pool);
    final Strategy<Promise<Integer>> spi = Strategy.executorStrategy(pool);

    // This actor performs output and detects the termination condition.
    final Actor<List<Integer>> out = actor(su, new Effect<List<Integer>>() {
      public void e(final List<Integer> fs) {
        for (P2<Integer, Integer> p : fs.zipIndex()) {
          System.out.println(MessageFormat.format("n={0} => {1}", p._2(), p._1()));
        }
        pool.shutdown();
      }
    });

    // A parallel recursive Fibonacci function
    final F<Integer, Promise<Integer>> fib = new F<Integer, Promise<Integer>>() {
      public Promise<Integer> f(final Integer n) {
        return n < CUTOFF ?
                promise(su, P.p(seqFib(n))) :
                f(n - 1).bind(f(n - 2), add);
      }
    };

    System.out.println("Calculating Fibonacci sequence in parallel...");

    join(su, spi.parMap(fib, range(0, 46)).map(Promise.<Integer>sequence(su))).to(out);
  }

  // The sequential version of the recursive Fibonacci function
  public static int seqFib(final int n) {
    return n < 2 ? n : seqFib(n - 1) + seqFib(n - 2);
  }

}

For all you Scala fans out there, the Functional Java library comes with convenient bindings for Scala as well. Here’s the same thing written in Scala. Note that this does not use the Actor library from the standard Scala libraries, but the same lighter weight Java implementation that the Java example above uses.


package concurrent

import fj.control.parallel.{Actor, Promise}
import fj.Function.curry
import fj.control.parallel.Strategy.executorStrategy
import fjs.control.parallel.Strategy.parMap
import fjs.control.parallel.Promise._
import fjs.control.parallel.Actor._
import Integer.parseInt
import List.range
import java.util.concurrent.Executors.newFixedThreadPool
import fjs.F._
import fjs.F2._
import fjs.P1._
import fjs.P2._
import fjs.data.List._
import fjs.control.parallel.Strategy.ListPar

object Fibs {
  val CUTOFF = 35;

  def main(args: Array[String]) = {
    if (args.length < 1)
      error("This program takes an argument: number_of_threads")

    val threads = parseInt(args(0))
    val pool = newFixedThreadPool(threads)
    implicit def s[A] = executorStrategy[A](pool)

    // This actor performs output and detects the termination condition.
    val out: Actor[List[Int]] = actor{
      ns =>
        for ((n, i) <- ns.zipWithIndex) printf("n=%d => %d\n", i, n)
        pool.shutdown()
    }

    // A parallel recursive Fibonacci function
    def fib(n: Int): Promise[Int] = {
      if (n < CUTOFF) promise(() => seqFib(n))
      else fib(n - 1).bind(fib(n - 2), curry((_: Int) + (_: Int)))
    }

    println("Calculating Fibonacci sequence in parallel...")
    out ! sequence(parMap[Int, Promise[Int], List](fib, range(0, 46)));
  }

  // The sequential version of the recursive Fibonacci function
  def seqFib(n: Int): Int = if (n < 2) n else seqFib(n - 1) + seqFib(n - 2);
}

Here’s an example run of this program using a pool of 10 threads. It runs about 7 times faster that way than with just 1 thread on my 8-way machine. The Scala version is also very slightly faster for some reason.

$ scala -classpath .:../../../build/classes/src concurrent.Fibs 10
Calculating Fibonacci sequence in parallel...
n=0 => 0
n=1 => 1
n=2 => 1
n=3 => 2
n=4 => 3
n=5 => 5
n=6 => 8
n=7 => 13
n=8 => 21
n=9 => 34
n=10 => 55
n=11 => 89
n=12 => 144
n=13 => 233
n=14 => 377
n=15 => 610
n=16 => 987
n=17 => 1597
n=18 => 2584
n=19 => 4181
n=20 => 6765
n=21 => 10946
n=22 => 17711
n=23 => 28657
n=24 => 46368
n=25 => 75025
n=26 => 121393
n=27 => 196418
n=28 => 317811
n=29 => 514229
n=30 => 832040
n=31 => 1346269
n=32 => 2178309
n=33 => 3524578
n=34 => 5702887
n=35 => 9227465
n=36 => 14930352
n=37 => 24157817
n=38 => 39088169
n=39 => 63245986
n=40 => 102334155
n=41 => 165580141
n=42 => 267914296
n=43 => 433494437
n=44 => 701408733
n=45 => 1134903170

Massive win! If we had been using Future instead of Promise, we would have needed at least 55 threads (since we’re using a cutoff at 35 and 45 – 35 = 10 and fib(10) = 55). Heck, we could even remove the threshold value altogether and calculate all 45 parallel fibs, in parallel. That would require 1,134,903,170 threads in the absence of non-blocking concurrency abstractions like Promise and Actor. We can run that in just one thread if we’d like.

About these ads

13 thoughts on “Higher-Order Java Parallelism, Part 4: A Better Future

  1. Excellent, as all your functional (Java) articles have been.

    The CUTOFF and resorting to a sequential implementation looks like the Fork/Join framework from Doug. Would be interesting to have Fork/Join as a Strategy and see how well it compares to the fixed thread pool approach.

    Keep up the good work, very interesting.

    Peace
    -stephan

    http://stephan.reposita.org

  2. Pingback: Functional design patterns and cargo cult blogging at Stephans Blog

  3. Very nice post (and blog) I discovered through ‘Stephans blog’ comment :) I was wondering whether the same kind of things has already been done with Haskell (as you mentiong reading Paul Hudak’s book) ?

  4. Since the Haskell code hides the thread pools and strategies and uses forM_ I’ve tried to refactor these away into a class Par to see whether the intention is better revealed even if the flexibility is reduced.


    public class ParallelFib {

    private static final int CUTOFF = 35;

    // The sequential version of the recursive Fibonacci function
    public static int seqFib(final int n) {
    return n < 2 ? n
    : seqFib(n - 1) + seqFib(n - 2);
    }

    public static void main(final String[] args) throws Exception {
    // A parallel recursive Fibonacci function
    final F<Integer, Promise> fib = new F<Integer, Promise>() {
    public Promise f(final Integer n) {
    return n < CUTOFF ?
    Par.promise_(seqFib(n)) :
    f(n - 1).bind(f(n - 2), add);
    }
    };

    // This actor performs output and detects the termination condition.
    final Actor<List> out = Par.actor_(new Effect<List>() {
    public void e(final List fs) {
    for (P2 p : fs.zipIndex()) {
    System.out.println(MessageFormat.format("n={0} => {1}", p._2(), p._1()));
    }
    }
    });

    System.out.println("Calculating Fibonacci sequence in parallel...");

    // note this is forM rather than forM_ as I can't see how to move "out" inward
    // without loosing the correct output order
    Par.forM(range(0, 46), fib).to(out);
    }

    That seems more obvious to me. I couldn’t see why bind(join(su, P1.curry(this).f(n - 2)), add) was required when bind(f(n - 2), add) seems to work just as well?

    Here’s the framework wrapper.


    private static class Par {
    final static int threads = 10;
    final static ExecutorService pool = Executors.newFixedThreadPool(threads);
    final static Strategy su = Strategy.executorStrategy(pool);

    private static
    Promise promise_(A val) {
    return promise(su, P.p(val));
    }

    private static
    Actor actor_(final Effect effect) {
    return actor(su, new Effect
    () {
    public void e(final A in) {
    effect.e(in);
    pool.shutdown();
    }
    });
    }

    private static
    Promise<List> forM(final List range, final F<A, Promise> f) {
    return join(su, sequence(parMap(range, f)));
    }

    private static
    P1<List<Promise>> parMap(final List range, final F<A, Promise> f) {
    return Strategy.<Promise
    >executorStrategy(pool).parMap(f, range);
    }

    private static
    P1<Promise<List>> sequence(final P1<List<Promise>> m) {
    return m.map(Promise.
    sequence(su));
    }

    }

    }

  5. Rohan, you’re also correct that f(n – 1).bind(f(n – 2), add) would work just as well. The API has been improved since this example was first written. I’ve made the modification.

  6. Ah, I remember why it was bind(join(su, P1.curry(this).f(n – 2)), add) instead of f(n – 1).bind(f(n – 2), add) It’s because Java doesn’t have tail-call optimisation and I wanted to use constant stack.

  7. Fantastic. You mentioned that making threads which do nothing but wait to claim futures/promises is a bad idea because it leads to an explosion in threads. Worse, if you are using a thread pool this can easily lead to deadlock, because all the active threads in the pool could be used up on these blocked callables.

    So to reinforce your point, it’s very important to give Promises to Actors when you’re done building them, or at least ensure that you never claim() Promise p inside a computation that will be executed using the same Strategy that created p.

    This series has been great, please keep it up!

  8. Pingback: The goal and the game rules « Marco Faustinelli's Blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s