Higher-Order Java Parallelism, Part 3: Threadless Concurrency With Actors

Multi-core processing is changing the way we write programs. Modern computers come with multi-core or multiple processors, and modern applications no longer do just one thing at a time. But all of this has left Java a little bit behind. While languages like Erlang and Haskell, even other JVM languages like Scala, have powerful concurrency abstractions, concurrent programming in Java is still mostly based around the threading practices of way back in the time of applets.

Threading in Java has long been the domain of the brave or foolish, weaving a brittle and intricate web of manual synchronization between threads, or threads waiting on other threads to feed them work. We all know the examples from old Java books where there might be a thread running in a loop, sleeping for a second, and waking up to check if work needs to be done. This kind of implementation is still being taught to newcomers to the Java language. And when they start writing their own applications in Java, they find that this approach does not scale. Not along the axis of program complexity, nor the axis of the number of threads. In a complex application or one with many threads, you may end up with a program that stops doing anything at all for long periods of time, or worse, hangs forever, while still consuming all the operating system resources that go with those threads.

Blocking vs. Nonblocking Concurrency

The new concurrency library in Java 5 help this situation a great deal. At least some advancement has been made towards better concurrency abstractions. But there are still pitfalls. For instance, as we’ve seen, the Future and Callable interfaces are really rather jolly useful, and they can take us a long way towards the kinds of things possible in those other languages. But at the end of the day, something has to call Future.get(), and that something will block unless the value is already available, until such time that it is. This can result in deadlocks or starvation, as you may end up in a situation where all threads are blocking on Futures and none are available to advance your program forward. This would be bad. In fact, we could say that in a highly concurrent system, blocking on shared state is a catastrophic event.

The Java libraries are a veritable minefield of methods that, ultimately, block on shared memory monitors. There’s Thread.join(), Future.get(), BlockingQueue.take(), and the list goes on. But what if we could solve this problem with a simple abstraction? Could we solve the blocking problem once and re-use our solution for different situations? Let’s see.

Instead of sleeping or blocking, what we really want is the ability for our code to say: “When X happens, wake up a thread to execute me.” That is, in some sense we want threads to go on doing useful work, and have them be notified when an event happens that requires more work to be done. For example, when Bob comes into my office and asks me for this month’s TPS reports, he doesn’t stand around and wait for them, nor does he check periodically to see if I have them done, and he certainly doesn’t go to sleep to periodically wake up and poll me for them. He will continue with his work (or whatever it is he does all day), since I have instructions to send him those TPS reports as soon as they’re ready. We’re independent and concurrent actors, so we don’t wait on each other.

There is a model of computation, called the Actor Model, that works very much the same way. This is the model employed in the design of Erlang, and it was a major motivator for languages like Simula and Scheme. An implementation of Actors comes with the standard Scala library, and as it happens an implementation of it for Java has just been released as part of the 2.8 version of the Functional Java library. You can read more about the actor model on your own, but I will explain in some detail how it works in Functional Java.

Algorithm + Strategy = Parallelism

The first thing to explain is Parallel Strategies. I’ve talked about them before, but I’ll do so again here since they’re important to what I’m trying to demonstrate. The idea of the Parallel Strategy is that we can capture a threading pattern, or some method of concurrency in general, in a separate abstraction called a Strategy, and write our concurrent programs independently of the particular threading pattern.

In Java terms, a Strategy<A> is a way of turning any Java expression, whose evaluated type is A, into a value of type A. How this happens is implementation-specific, but we’d like it to be concurrent. First, we need a way of capturing the idea of an unevaluated A. It’s a bit like the idea behind Runnable, only that we can get a value out of it. Java 5 has the Callable interface, which is really close to representing “any expression”, since we can create Callables anonymously, but its call() method throws Exceptions, creating a kind of barrier for the programmer. Instead, we’ll use fj.P1<A>. It’s an abstract class with one abstract method: A _1()

The Strategy class has an instance method called par(P1<A>). This will evaluate the given P1 concurrently, immediately returning another P1 that can get the value once it’s ready. Behind the scenes, it actually creates a Callable, turns it into a Future, then returns a P1 that calls Future.get(), but all of that is implementation detail. Strategy comes with some static construction methods that implement basic threading patterns, and you can implement your own by passing a Future-valued function to the strategy(F<P1<A>, Future<A>>) method.

You might be concerned that P1 can’t throw any checked exceptions. This is by design. You’ll find that once you start working with concurrent effects, the value of checked exceptions goes out the window. Besides, if our P1 did need to throw errors, we would use Lazy Error Handling and declare this in the P1's type.

Strategy + Effect = Actor

So now we have concurrent evaluation and parallel transformations. But we’re still ending up with objects that require us to call a blocking method to get their value. The only way around that is to not return any values at all. Instead, we’re going to let our P1s have side-effects. So instead of returning a P1 that lets us wait for the value, our P1s are now going to update a variable or send the value somewhere.

To model a P1 that doesn’t return a useful value, I’m going to use the Unit type. It’s another one of those trivial but useful classes. The Unit type is part of Functional Java, and it’s simply a type that only has one possible value. It’s a lot like the standard library’s Void type, except Void is even further impoverished. When you use this type, you’re saying that the actual value is immaterial. So to describe a P1 that doesn’t return anything useful, we use the type P1<Unit>. We can also describe a transformation from some type T to Unit, with F<T, Unit>.

Functional Java comes with a more honest version of F<A, Unit>, which is Effect<A>. It’s more honest in the sense that it doesn’t pretend to be a transformation, which properly should have no side-effects. Effect<A> is explicit about having side-effects, which is what we intend for Actors. It has one method that doesn’t return a value at all: void e(A a).

We now have what we need to instantiate the Actor class. Here’s an example of how to create and use a simple actor:


  Strategy<Unit> s = Strategy.simpleThreadStrategy();

  Actor<String> a = Actor.actor(s, new Effect<String>()
    {public void e(String s)
      {System.out.println(s);}});

  a.act("Hello, actors!");


The actor receives “messages” on its act method, and the Strategy serves as a kind of mailbox for it. You will note that there’s no dependency at all on any particular threading strategy or any part of the concurrency library by the Actor class. The strategy could be sending our Effects to be evaluated by Threads, remote server farms, by ForkJoin tasks, or even by a Mechanical Turk.

An Actor With its Own Mailbox

The Actor class by itself doesn’t yet quite achieve the kind of actors model you see implemented in Erlang or Scala. The important difference is that the Actor above can process multiple “messages” simultaneously. This solution, then, is more general, although if an actor such as the above mutates some state, we’re likely to run into race conditions. Not to worry, it’s easy to construct the more specific case. We just add a queue and ensure that only one message is processed at a time. This construct is available as part of Functional Java, and it’s called QueueActor.

Of course, the “one thread at a time” requirement is not implemented using any blocking or synchronization. Instead, The QueueActor has two possible states–“suspended” or “running”–and, behind the scenes, this is enforced with an AtomicBoolean to keep it consistent in the face of concurrency. If the actor is suspended when it receives a message, it becomes running and its Effect is immediately handed off to its Strategy. If it’s already running, then callers will leave a message on its queue. The QueueActor's Effect is a concurrent, threadless recursion (i.e. it uses a Strategy rather than a Thread) that completely empties the queue, then puts the QueueActor's state back to “suspended”.

QueueActor puts some sanity into managing locally mutable state within an actor’s Effect, since it’s ensured that the state can only be mutated by one thread at a time. It is guaranteed to act on its messages in some unspecified order, but is otherwise semantically equivalent to Actor.

The Obligatory Example

We now have a light-weight implementation of the actor model, in Java. Don’t believe it? Have a look at this implementation of the canonical Ping-Pong example (imports omitted), and compare it to similar examples for Erlang and Scala.

First, a Ping actor that sends a number of pings to a given Pong actor, waiting for a pong reply each time before sending the next ping.


  public class Ping
    {private final Pong pong;
     private final Actor<Pong> ping;
     private final Actor<Integer> cb;
     private volatile int n;

     public Ping(final Strategy<Unit> s, final int i, final Pong pong, final int id, final Actor<Integer> callback)
       {n = i;
        this.pong = pong;
        cb = callback;
        ping = actor
          (s, new Effect<Pong>() {public void e(final Pong pong)
            {n--;
             if (n > 0)
                pong.act(Ping.this);
             else // Done. Notify caller.
                cb.act(id);}});}

    // Commence pinging
    public P1<Unit> start()
      {return pong.act(this);}

    // Receive a pong
    public P1<Unit> act(final Pong p)
      {return ping.act(p);}}

The Pong actor simply receives ping messages and responds.


  public class Pong
    {private final Actor<Ping> p;

    public Pong(final Strategy<Unit> s)
      {p = actor(s, new Effect<Ping>()
        {public void e(final Ping m)
          {m.act(Pong.this);}});}

    // Receive a ping
    public P1<Unit> act(final Ping ping)
      {return p.act(ping);}}

And here’s the main program that uses the Ping and Pong actors. There’s only one Pong actor that responds to any number of Ping actors pinging it concurrently. There’s also a further QueueActor that is contacted by each Ping actor once that Ping actor has done its work. The example uses a thread pool to back its Strategy. When all the Ping actors have sent all their pings and received all their pongs, the program is terminated by shutting down the thread pool.


  public class PingPong
    {private final int actors;
     private final int pings;
     private final Strategy<Unit> s;
     private final Actor<Integer> callback;
     private volatile int done;

     public PingPong(final ExecutorService pool, final int actors, final int pings)
       {this.actors = actors;
        this.pings = pings;
        s = Strategy.executorStrategy(pool);

        // This actor gives feedback to the user that work is being done
        // and also terminates the program when all work has been completed.
        callback = QueueActor.queueActor
          (s, new Effect<Integer>() {public void e(final Integer i)
            {done++;
             if (done >= actors)
               {System.out.println("All done.");
                pool.shutdown();}
             else if (actors < 10 || done % (actors / 10) == 0)
                     System.out.println(MessageFormat.format
                       ("{0} actors done ({1} total pongs).", done, pings * done));}})
          .asActor();}

    public static void main(final String[] args)
      {final int actors  = Integer.parseInt(args[0]);
       final int pings   = Integer.parseInt(args[1]);
       final int threads = Integer.parseInt(args[2]);
       new PingPong(Executors.newFixedThreadPool(threads), actors, pings).start();}

    public void start()
      {// We will use one Pong actor...
       final Pong pong = new Pong(s);
       // ...and an awful lot of Ping actors.
       for (int i = 1; i <= actors; i++)
           {new Ping(s, pings, pong, i, callback).start();
            if (actors < 10 || i % (actors / 10) == 0)
               System.out.println(MessageFormat.format("{0} actors started.", i));}}}

What follows is an example run of this Java program, with a million concurrent Ping actors pinging 7 times each. Each actor takes about 300 bytes of memory, so we need a sizable heap for one million of them, but 19 real Java Threads handle this quite nicely on my 8-core machine.

$ time java -Xmx600m -cp ../../../.build/classes/src:. concurrent.PingPong 1000000 7 19
100,000 actors started.
200,000 actors started.
300,000 actors started.
400,000 actors started.
500,000 actors started.
600,000 actors started.
700,000 actors started.
800,000 actors started.
900,000 actors started.
1,000,000 actors started.
100,000 actors done (700,000 total pongs).
200,000 actors done (1,400,000 total pongs).
300,000 actors done (2,100,000 total pongs).
400,000 actors done (2,800,000 total pongs).
500,000 actors done (3,500,000 total pongs).
600,000 actors done (4,200,000 total pongs).
700,000 actors done (4,900,000 total pongs).
800,000 actors done (5,600,000 total pongs).
900,000 actors done (6,300,000 total pongs).
All done.

real    1m16.376s
user    3m53.612s
sys     0m10.924s

As you see, these simple tools, built on basic components of the Java 5 concurrency library, paired with powerful abstractions from programming in functional style, makes it seem like we have millions of tasks running concurrently. We get a virtually unbounded degree of concurrency while never seeing any locks nor performing any blocking calls. The number of concurrent tasks is limited only by the size of your heap.

Again, everything you see here, and more, has been released just recently as part of the Functional Java library. So head over to their download page and put those extra cores to use with Java, today!

Advertisements