This is the fourth installment in a series of posts about making highly concurrent software easier to write in Java. Previous entries are available here: part 1, part 2, part 3. However, I aim to make it possible to follow along even if you haven’t read the previous posts.
I Have Seen the Future…
If you have used the Java 5 concurrency API at all, you will have come across the Future
class. For example, when you submit a Callable<Integer>
to an ExecutorService
, what you get back is a Future<Integer>
which represents a computation, running concurrently, that will (hopefully) result in an integer at some time in the future. Once you have the Future<Integer> fi
, you can later get the integer out of it by calling fi.get().
That’s all fine and dandy, but let’s say you want do do something like add two future integers. You could do something like this:
int sum = x.get() + y.get();
This will block the current thread until both of those integers are available, then add them together. But why wait for that? If you have an ExecutorService
, you can create a new Future
that computes the sum:
Future<Integer> sum = executorService.submit(new Callable<Integer>() { public Integer call() { return x.get() + y.get(); } });
Now the current thread can continue, but we’ve started a new thread that does nothing until the values of x
and y
have both been calculated by yet another thread.
We’re beginning to see a problem here. We want to be able to compose Futures
together to form new Futures
, but find that the number of threads required to compose n Future
values is on the order of O(n). If we have a fixed-size thread pool, we’ll run into starvation. If we have an unbounded thread pool, then we might start more threads than the operating system can handle, most of which will be doing nothing at all but wait for other threads.
This should all sound very familiar. Threads are a space resource. What kind of processes are O(n) in their space requirement? If you said “linearly recursive processes”, go to the head of the class. Intuitively, for the same reason that we can find iterative versions of any recursive algorithm, it seems that we should be able to find an algorithm to accomplish the same thing with O(1) threads.
…and it is a Monad
In the above example, it’s like we’re giving seperate instructions, waiting for the results of each in between. Imagine if we were working in an office with Bob and Alice, and we needed work on something from both of them. We might go to Bob and say: “Bob, process this and give me the result”. Then we’d take the result to Alice and say: “Alice, here’s a result from Bob.” It would be much better, if we could just go to Bob and say: “Bob, process this and give the result to Alice.” This is the essential difference between recursive and iterative processes.
But wait! We say that kind of thing all the time, in Java:
public Work bob(Work w) { ... } public Work alice(Work w) { ... } public Work bobThenAlice(Work w) { Work b = bob(w); return alice(b); }
Here, we’re instructing a single thread to do some work, then use the result of that work to do more work. What’s really sneaky here is the meaning of the semicolon. In this context, what the former semicolon means is “take the stored value b
from the previous statement and bind it to the free variable b
in the next statement”. You can think of the second semicolon as binding a blank statement over the result of the preceding statement.
Using first-class functions from Functional Java, and using the Callables monad from the first part of this series, you could implement that same behaviour using something like this:
F<Work, Callable<Work>> bob = new F<Work, Callable<Work>>() { public Callable<Work> f(final Work w) { return new Callable<Work>() { public Work call() { ... } }; } }; F<Work, Callable<Work>> alice = new F<Work, Callable<Work>>() { ... }; public Callable<Work> bobThenAlice(Work w) { return Callables.bind(bob.f(w), alice); }
That’s pretty neat. Now we have a single Callable
that we can run concurrently in a new thread, turning it into a Future
. But wouldn’t it be cool if we could bind Futures?
That would let us take already running computations and combine them in exactly this way. We want a Future
monad.
The problem with combining Futures
is in the nature of the future. This is a deliberate pun on “future”. Think about time for a second. What does it mean to get a value that’s in the future? By the very fact that causality is sequential, it’s a violation of the nature of reality to have something that doesn’t yet exist. It’s the future; you’re not supposed to get stuff out. But, we can put stuff in, can’t we? Yes we can. You know those corny time-capsule things where people put their mountain bikes and Nintendo games for future generations to enjoy later? We can do that with data values. And not just values, but computations.
Here’s One I Made Earlier
The Future
class in the standard Java libraries doesn’t come with any methods for projecting computations into the future. But Functional Java comes with a class called Promise<A> which does have that feature. It makes use of light-weight concurrent processes (actors), and parallel strategies, as described in the previous post, to implement the ability to combine concurrent computations into larger (concurrently executing) structures.
Since it is implemented as a monad, the methods it provides are all the usual suspects: unit, bind, fmap, join, etc. Here’s a quick overview of what they do and why they’re useful. Grasping them doesn’t just help you understand the Promise
class, but any monad you may come across in the (ahem) future.
The unit function, the constructor of Promises
, is just called promise
. It has a few overloaded forms, but here is the simplest one.
public static <A> Promise<A> promise(Strategy<A> s, P1<A> p);
The P1
class is just a simple closure with no arguments, provided by the Functional Java library. P1<A>
consists of one abstract method: A _1(). Strategy
represents a method of evaluating P1s
concurrently. I also talk about Strategies
in the previous post, but the long and the short of it is that it has methods to evaluate the P1
value according to some parallelisation strategy, like with a thread pool for instance.
Calling the promise
method starts a concurrent computation, in a manner according to the given strategy, that evaluates p
. The resulting Promise
value is a handle on the running computation, and can be used to retrieve the value later. Promise.claim()
will block the current thread until the value is available, exactly like Future.get()
, but this is generally not what you want to do. Instead, you want to bind.
The essence of the monad pattern is the binding function. If you don’t think you already know what a monad is, but understand this method, then you know more than you think:
public Promise<B> bind(F<A, Promise<B>> f);
This method means that if you have a Promise
of an A
, and a function from an A
to a Promise
of a B
, you can get a Promise
of a B
. I.e. if somebody promises you an A
, and I can promise you a B
for every A
, it’s the same thing as being promised a B
in the first place.
The mapping function:
public Promise<B> fmap(F<A, B> f);
This method means that if you have an Promise
of an A
, and a function from A
to B
, you can get a Promise
of a B
. In other words, you can map any function over a Promise
, and fmap
will return you a Promise
of the result. Behind the scenes, fmap
is implemented by calling the bind
and promise
methods. The difference between this method and the bind
method is subtle but important. Calling p.bind(f)
is exactly equivalent to calling Promise.join(p.fmap(f)).
The join function:
public static <A> Promise<A> join(Promise<Promise<A>> a);
Join
is a lot more useful than it looks. If you have a promised Promise,
it’s the same as just having a Promise
. In practise, that means that if you can start a concurrent task that starts a concurrent task, you can combine those into one concurrent task. You can think of it as the semantic equivalent of Thread.join()
, except that our method returns the joined Promise
immediately.
Coming back to Bob and Alice for a second, we can implement bob
and alice
from the Callables
example above, using Promise
instead of Callable
. Both bob
and alice
will construct Promises
using the promise
method, putting whatever work they do inside a P1
. That way, when you call bob
, he’s already doing his work by the time you mention Alice’s name:
final Strategy<Work> s = Strategy.simpleThreadStrategy(); F<Work, Promise<Work>> bob = new F<Work, Promise<Work>>() { public Promise<Work> f(final Work w) { return promise(s, new P1() { public Work _1() { ... } }); } }; F<Work, Promise<Work>> alice = new F<Work, Promise<Work>>() { ... }; public Promise<Work> bobThenAlice(Work w) { return bob.f(w).bind(alice); }
So now that we can build arbitrarily complex concurrent processes from already-running processes, how do we get the final promised value out? Again, you could call Promise.claim()
, but that blocks the current thread as we know. Instead, Promise
comes equipped with a method to(Actor<A>)
which promises to send the value to the given Actor
as soon as it’s ready. Control is returned to the current thread immediately, and the whole computation continues in the background, including the action to take on the final result. Actors
were discussed in the previous post.
A Fully Functional Example
I think an example is in order. The following program calculates Fibonacci numbers using a naive recursive algorithm. This is an algorithm that benefits particularly well from parallelisation (barring any other kind of optimisation). If we were just using plain old Future
instead of Promise
, the number of Threads
required to calculate the nth Fibonacci number is O(fib(n)). But since we’re using Promise
, we can use a fixed number of actual Java threads.
package concurrent;
import static fj.Bottom.error;
import fj.Effect;
import fj.F;
import fj.F2;
import fj.Function;
import fj.P;
import fj.P1;
import fj.P2;
import fj.Unit;
import fj.data.List;
import fj.control.parallel.Actor;
import fj.control.parallel.Promise;
import fj.control.parallel.Strategy;
import static fj.data.List.range;
import static fj.function.Integers.add;
import static fj.control.parallel.Promise.join;
import static fj.control.parallel.Promise.promise;
import static fj.control.parallel.Actor.actor;
import java.text.MessageFormat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Fibs {
private static final int CUTOFF = 35;
public static void main(final String[] args) throws Exception {
if (args.length < 1)
throw error("This program takes an argument: number_of_threads");
final int threads = Integer.parseInt(args[0]);
final ExecutorService pool = Executors.newFixedThreadPool(threads);
final Strategy
final Strategy
// This actor performs output and detects the termination condition.
final Actor> out = actor(su, new Effect
>() {
public void e(final List
for (P2
System.out.println(MessageFormat.format(“n={0} => {1}”, p._2(), p._1()));
}
pool.shutdown();
}
});
// A parallel recursive Fibonacci function
final F
public Promise
return n < CUTOFF ?
promise(su, P.p(seqFib(n))) :
f(n - 1).bind(f(n - 2), add);
}
};
System.out.println("Calculating Fibonacci sequence in parallel...");
join(su, spi.parMap(fib, range(0, 46)).map(Promise.
}
// The sequential version of the recursive Fibonacci function
public static int seqFib(final int n) {
return n < 2 ? n : seqFib(n - 1) + seqFib(n - 2);
}
}
[/sourcecode]
For all you Scala fans out there, the Functional Java library comes with convenient bindings for Scala as well. Here’s the same thing written in Scala. Note that this does not use the Actor library from the standard Scala libraries, but the same lighter weight Java implementation that the Java example above uses.
package concurrent
import fj.control.parallel.{Actor, Promise}
import fj.Function.curry
import fj.control.parallel.Strategy.executorStrategy
import fjs.control.parallel.Strategy.parMap
import fjs.control.parallel.Promise._
import fjs.control.parallel.Actor._
import Integer.parseInt
import List.range
import java.util.concurrent.Executors.newFixedThreadPool
import fjs.F._
import fjs.F2._
import fjs.P1._
import fjs.P2._
import fjs.data.List._
import fjs.control.parallel.Strategy.ListPar
object Fibs {
val CUTOFF = 35;
def main(args: Array[String]) = {
if (args.length < 1)
error("This program takes an argument: number_of_threads")
val threads = parseInt(args(0))
val pool = newFixedThreadPool(threads)
implicit def s[A] = executorStrategy[A](pool)
// This actor performs output and detects the termination condition.
val out: Actor[List[Int]] = actor{
ns =>
for ((n, i) <- ns.zipWithIndex) printf("n=%d => %d\n”, i, n)
pool.shutdown()
}
// A parallel recursive Fibonacci function
def fib(n: Int): Promise[Int] = {
if (n < CUTOFF) promise(() => seqFib(n))
else fib(n – 1).bind(fib(n – 2), curry((_: Int) + (_: Int)))
}
println(“Calculating Fibonacci sequence in parallel…”)
out ! sequence(parMap[Int, Promise[Int], List](fib, range(0, 46)));
}
// The sequential version of the recursive Fibonacci function
def seqFib(n: Int): Int = if (n < 2) n else seqFib(n - 1) + seqFib(n - 2);
}
[/sourcecode]
Here's an example run of this program using a pool of 10 threads. It runs about 7 times faster that way than with just 1 thread on my 8-way machine. The Scala version is also very slightly faster for some reason.
$ scala -classpath .:../../../build/classes/src concurrent.Fibs 10 Calculating Fibonacci sequence in parallel… n=0 => 0 n=1 => 1 n=2 => 1 n=3 => 2 n=4 => 3 n=5 => 5 n=6 => 8 n=7 => 13 n=8 => 21 n=9 => 34 n=10 => 55 n=11 => 89 n=12 => 144 n=13 => 233 n=14 => 377 n=15 => 610 n=16 => 987 n=17 => 1597 n=18 => 2584 n=19 => 4181 n=20 => 6765 n=21 => 10946 n=22 => 17711 n=23 => 28657 n=24 => 46368 n=25 => 75025 n=26 => 121393 n=27 => 196418 n=28 => 317811 n=29 => 514229 n=30 => 832040 n=31 => 1346269 n=32 => 2178309 n=33 => 3524578 n=34 => 5702887 n=35 => 9227465 n=36 => 14930352 n=37 => 24157817 n=38 => 39088169 n=39 => 63245986 n=40 => 102334155 n=41 => 165580141 n=42 => 267914296 n=43 => 433494437 n=44 => 701408733 n=45 => 1134903170
Massive win! If we had been using Future
instead of Promise
, we would have needed at least 55 threads (since we’re using a cutoff at 35 and 45 – 35 = 10 and fib(10) = 55). Heck, we could even remove the threshold value altogether and calculate all 45 parallel fibs, in parallel. That would require 1,134,903,170 threads in the absence of non-blocking concurrency abstractions like Promise
and Actor
. We can run that in just one thread if we’d like.
Excellent, as all your functional (Java) articles have been.
The CUTOFF and resorting to a sequential implementation looks like the Fork/Join framework from Doug. Would be interesting to have Fork/Join as a Strategy and see how well it compares to the fixed thread pool approach.
Keep up the good work, very interesting.
Peace
-stephan
http://stephan.reposita.org
Pingback: Functional design patterns and cargo cult blogging at Stephans Blog
Very nice post (and blog) I discovered through ‘Stephans blog’ comment :) I was wondering whether the same kind of things has already been done with Haskell (as you mentiong reading Paul Hudak’s book) ?
Arnaud,
Yes, Haskell has something very similar and in fact this is all rather loosely based on a Haskell library. See here. The Fibonacci example is based on a Haskell example by Don Stewart.
Thanks a lot for the pointers. The Haskell implementation’s conciseness does not plead in favor of the Java one :) Have you tried writing it using Java’s closures ?
The Fibonacci example, using Java 7 BGGA notation.
Hmmm, a little better for sure. Yet the essence of the Fibonacci function is still buried in some boilerplate. Thanks again for the pointers, it will surely be helpful.
Since the Haskell code hides the thread pools and strategies and uses forM_ I’ve tried to refactor these away into a class Par to see whether the intention is better revealed even if the flexibility is reduced.
public class ParallelFib {
private static final int CUTOFF = 35;
// The sequential version of the recursive Fibonacci function
public static int seqFib(final int n) {
return n < 2 ? n
: seqFib(n - 1) + seqFib(n - 2);
}
public static void main(final String[] args) throws Exception {
// A parallel recursive Fibonacci function
final F<Integer, Promise> fib = new F<Integer, Promise>() {
public Promise f(final Integer n) {
return n < CUTOFF ?
Par.promise_(seqFib(n)) :
f(n - 1).bind(f(n - 2), add);
}
};
// This actor performs output and detects the termination condition.
final Actor<List> out = Par.actor_(new Effect<List>() {
public void e(final List fs) {
for (P2 p : fs.zipIndex()) {
System.out.println(MessageFormat.format("n={0} => {1}", p._2(), p._1()));
}
}
});
System.out.println("Calculating Fibonacci sequence in parallel...");
// note this is forM rather than forM_ as I can't see how to move "out" inward
// without loosing the correct output order
Par.forM(range(0, 46), fib).to(out);
}
That seems more obvious to me. I couldn’t see why
bind(join(su, P1.curry(this).f(n - 2)), add)
was required whenbind(f(n - 2), add)
seems to work just as well?Here’s the framework wrapper.
private static class Par {
final static int threads = 10;
final static ExecutorService pool = Executors.newFixedThreadPool(threads);
final static Strategy su = Strategy.executorStrategy(pool);
private static
Promise promise_(A val) {
return promise(su, P.p(val));
}
private static
Actor actor_(final Effect effect) {
return actor(su, new Effect() {
public void e(final A in) {
effect.e(in);
pool.shutdown();
}
});
}
private static
Promise<List> forM(final List range, final F<A, Promise> f) {
return join(su, sequence(parMap(range, f)));
}
private static
P1<List<Promise>> parMap(final List range, final F<A, Promise> f) {
return Strategy.<Promise>executorStrategy(pool).parMap(f, range);
}
private static
P1<Promise<List>> sequence(final P1<List<Promise>> m) {
return m.map(Promise.sequence(su));
}
}
}
Rohan, thanks for that. I like the idea of wrapping things in a single module/class. It does make the intent clearer, and the addition of forM certainly makes parMap more useful.
Do feel free to contribute to the library:
http://functionaljava.org/community
Rohan, you’re also correct that f(n – 1).bind(f(n – 2), add) would work just as well. The API has been improved since this example was first written. I’ve made the modification.
Ah, I remember why it was bind(join(su, P1.curry(this).f(n – 2)), add) instead of f(n – 1).bind(f(n – 2), add) It’s because Java doesn’t have tail-call optimisation and I wanted to use constant stack.
Fantastic. You mentioned that making threads which do nothing but wait to claim futures/promises is a bad idea because it leads to an explosion in threads. Worse, if you are using a thread pool this can easily lead to deadlock, because all the active threads in the pool could be used up on these blocked callables.
So to reinforce your point, it’s very important to give Promises to Actors when you’re done building them, or at least ensure that you never claim() Promise p inside a computation that will be executed using the same Strategy that created p.
This series has been great, please keep it up!
Pingback: The goal and the game rules « Marco Faustinelli's Blog