Download Android App


Alternate Blog View: Timeslide Sidebar Magazine

Saturday, March 9, 2013

Composing Futures with Akka


Composing Futures provides a way to do two (or more) things at the same time and then wait until they are done. Typically in Java this would be done with a ExecutorService.

It is very often desirable to be able to combine different Futures with each other. Akka provides additional constructs that greatly simplifies some commons uses cases like making parallel remote service calls, collect and map results.

In this article, I will create a simple Java program that explores Akka composing futures. The sample programs works with Akka 2.1.1.

    <dependency>
        <groupid>com.typesafe.akka</groupid>
        <artifactid>akka-actor_2.10</artifactid>
        <version>2.1.1</version>
    </dependency>

In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a Java friendly interface in akka.dispatch.Futures.

Lets setup a Callable class that does some work and then returns a result. For this example, the work is just to pause for a random amount of time and the result is the amount of time it paused for.


import java.util.concurrent.Callable;

public class RandomPause implements Callable<Long> {

    private Long millisPause;

    public RandomPause() {
        millisPause = Math.round(Math.random() * 3000) + 1000; // 1,000 to 4,000
        System.out.println(this.toString() + " will pause for " + millisPause
                + " milliseconds");
    }

    public Long call() throws Exception {
        Thread.sleep(millisPause);
        System.out.println(this.toString() + " was paused for " + millisPause
                + " milliseconds");
        return millisPause;
    }
}
Akka's Future has several monadic methods that are very similar to the ones used by Scala's collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.

Here is Java app to compose the RandomPause futures.

import static akka.dispatch.Futures.future;
import static akka.dispatch.Futures.sequence;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Mapper;

public class SimpleFutures {
    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(4);
        ExecutionContext ec = ExecutionContexts.fromExecutorService(executor);

        List<future<Long>> futures = new ArrayList<future<Long>>();

        System.out.println("Akka Futures says: Adding futures for two random length pauses");

        futures.add(future(new RandomPause(), ec));
        futures.add(future(new RandomPause(), ec));

        System.out.println("Akka Futures says: There are " + futures.size()
                + " RandomPause's currently running");

        // compose a sequence of the futures
        Future<Iterable<Long>> futuresSequence = sequence(futures, ec);

        // Find the sum of the odd numbers
        Future<Long> futureSum = futuresSequence.map(
                new Mapper<Iterable<Long>, Long>() {
                    public Long apply(Iterable<Long> ints) {
                        long sum = 0;
                        for (Long i : ints)
                            sum += i;
                        return sum;
                    }
                }, ec);

        // block until the futures come back
        futureSum.onSuccess(new PrintResult<Long>(), ec);

        try {
                System.out.println("Result :" + Await.result(futureSum, Duration.apply(5, TimeUnit.SECONDS)));
        } catch (Exception e) {
                e.printStackTrace();
        } 
        
        executor.shutdown();
    }

}

Explanation:
In order to execute callbacks and operations, Futures need something called an ExecutionContext, which is very similar to a java.util.concurrent.Executor. In the above program, I have provided my own ExecutorService and passed it to factory methods provided by the ExecutionContexts.

Take note of 'sequence' that combines different Futures with each other.

To better explain what happened in the example, Future.sequence is taking the Iterable<Future<Long>> and turning it into a Future<Iterable<Long>>. We can then use map to work with the Iterable<Long> directly, and we aggregate the sum of the Iterable.

Finally, PrintResult simply prints the output of futureSum.

public final class PrintResult<T> extends OnSuccess<T> {
    
    @Override
    public final void onSuccess(T t) {

        System.out.println("PrintResults says: Total pause was for " + ((Long) t)
                + " milliseconds");
    }
}

Output:

Akka Futures says: Adding futures for two random length pauses
RandomPause@55e859c0 will pause for 3892 milliseconds
RandomPause@5430d082 will pause for 2306 milliseconds

Akka Futures says: There are 2 RandomPause's currently running
RandomPause@5430d082 was paused for 2306 milliseconds
RandomPause@55e859c0 was paused for 3892 milliseconds

PrintResults says: Total pause was for 6198 milliseconds
Note: Akka Actors are not used in this example.

No comments:

Post a Comment