Download Android App


Alternate Blog View: Timeslide Sidebar Magazine

Friday, March 22, 2013

Infinispan performance tweaks

This article is a follow up to Getting started: Infinispan as remote cache cluster

Out of the box Infinispan configuration works great for low to medium number of GET/PUT operations. But in distributed mode and for heavy  GET/PUT operations, you may frequently see locking failures like this one:


2013-03-22 00:14:20,033 [DEBUG] org.infinispan.server.hotrod.HotRodDecoder HotRodClientMaster-63 - Exception caught
org.infinispan.server.hotrod.HotRodException: org.infinispan.util.concurrent.TimeoutException: Unable to acquire lock after [10 seconds] on key [ByteArrayKey{data=ByteArray{size=18, hashCode=48079ac7, array=0x033e0f3134354065..}}] for requestor [Thread[HotRodClientMaster-63,5,main]]! Lock held by [(another thread)]
        at org.infinispan.server.hotrod.HotRodDecoder.createServerException(HotRodDecoder.scala:214)
        at org.infinispan.server.core.AbstractProtocolDecoder.decode(AbstractProtocolDecoder.scala:75)
        at org.infinispan.server.core.AbstractProtocolDecoder.decode(AbstractProtocolDecoder.scala:45)

Infinispan uses locking to maintain cache consistency. Optimizing locking settings can help improve overall performance. Here are some configuration tips to avoid locking issues and improve concurrency:


    <default>
        <locking concurrencylevel="1000" isolationlevel="READ_COMMITTED" lockacquisitiontimeout="500" uselockstriping="false">
        <jmxstatistics enabled="true" />
        <!-- Configure a asynchronous distributed cache -->
        <clustering mode="distribution">
            <async/>
            <hash numowners="2"></clustering>
        </locking>
    </default>

Explanation:
  • Concurrency level: Adjust this value according to the number of concurrent threads interacting with Infinispan.
  • lockAcquisitionTimeout: Maximum time to attempt a particular lock acquisition. Set this based on your application needs.
  • useLockStriping: If true, a pool of shared locks is maintained for all entries that need to be locked. Otherwise, a lock is created per entry in the cache. Lock striping helps control memory footprint but may reduce concurrency in the system.
Another configuration worth looking at it Level 1 (L1) cache. An L1 cache prevents unnecessary remote fetching of entries mapped to remote caches by storing them locally for a short time after the first time they are accessed. Read more here.

Wednesday, March 13, 2013

What Apache POI thinks of the Microsoft Document Format!


Most of us would have used Apache POI library. But do you know what POI and its subcomponents like HSSF stands for? Here we go.

The Apache POI project contains the following subcomponents:
  • POIFS (Poor Obfuscation Implementation File System) – This component reads and writes Microsoft's OLE 2 Compound document format. 
  • HSSF (Horrible SpreadSheet Format) – reads and writes Microsoft Excel (XLS) format files.  
  • HPSF (Horrible Property Set Format) – reads "Document Summary" information from Microsoft Office files. 
  • HWPF (Horrible Word Processor Format) – aims to read and write Microsoft Word 97 (DOC) format files.
  • HSLF (Horrible Slide Layout Format) – a pure Java implementation for Microsoft PowerPoint files.
  • DDF (Dreadful Drawing Format) – a package for decoding the Microsoft Office Drawing format.
  • HSMF (Horrible Stupid Mail Format) – a pure Java implementation for Microsoft Outlook MSG files.

Saturday, March 9, 2013

Composing Futures with Akka


Composing Futures provides a way to do two (or more) things at the same time and then wait until they are done. Typically in Java this would be done with a ExecutorService.

It is very often desirable to be able to combine different Futures with each other. Akka provides additional constructs that greatly simplifies some commons uses cases like making parallel remote service calls, collect and map results.

In this article, I will create a simple Java program that explores Akka composing futures. The sample programs works with Akka 2.1.1.

    <dependency>
        <groupid>com.typesafe.akka</groupid>
        <artifactid>akka-actor_2.10</artifactid>
        <version>2.1.1</version>
    </dependency>

In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a Java friendly interface in akka.dispatch.Futures.

Lets setup a Callable class that does some work and then returns a result. For this example, the work is just to pause for a random amount of time and the result is the amount of time it paused for.


import java.util.concurrent.Callable;

public class RandomPause implements Callable<Long> {

    private Long millisPause;

    public RandomPause() {
        millisPause = Math.round(Math.random() * 3000) + 1000; // 1,000 to 4,000
        System.out.println(this.toString() + " will pause for " + millisPause
                + " milliseconds");
    }

    public Long call() throws Exception {
        Thread.sleep(millisPause);
        System.out.println(this.toString() + " was paused for " + millisPause
                + " milliseconds");
        return millisPause;
    }
}
Akka's Future has several monadic methods that are very similar to the ones used by Scala's collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.

Here is Java app to compose the RandomPause futures.

import static akka.dispatch.Futures.future;
import static akka.dispatch.Futures.sequence;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Mapper;

public class SimpleFutures {
    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(4);
        ExecutionContext ec = ExecutionContexts.fromExecutorService(executor);

        List<future<Long>> futures = new ArrayList<future<Long>>();

        System.out.println("Akka Futures says: Adding futures for two random length pauses");

        futures.add(future(new RandomPause(), ec));
        futures.add(future(new RandomPause(), ec));

        System.out.println("Akka Futures says: There are " + futures.size()
                + " RandomPause's currently running");

        // compose a sequence of the futures
        Future<Iterable<Long>> futuresSequence = sequence(futures, ec);

        // Find the sum of the odd numbers
        Future<Long> futureSum = futuresSequence.map(
                new Mapper<Iterable<Long>, Long>() {
                    public Long apply(Iterable<Long> ints) {
                        long sum = 0;
                        for (Long i : ints)
                            sum += i;
                        return sum;
                    }
                }, ec);

        // block until the futures come back
        futureSum.onSuccess(new PrintResult<Long>(), ec);

        try {
                System.out.println("Result :" + Await.result(futureSum, Duration.apply(5, TimeUnit.SECONDS)));
        } catch (Exception e) {
                e.printStackTrace();
        } 
        
        executor.shutdown();
    }

}

Explanation:
In order to execute callbacks and operations, Futures need something called an ExecutionContext, which is very similar to a java.util.concurrent.Executor. In the above program, I have provided my own ExecutorService and passed it to factory methods provided by the ExecutionContexts.

Take note of 'sequence' that combines different Futures with each other.

To better explain what happened in the example, Future.sequence is taking the Iterable<Future<Long>> and turning it into a Future<Iterable<Long>>. We can then use map to work with the Iterable<Long> directly, and we aggregate the sum of the Iterable.

Finally, PrintResult simply prints the output of futureSum.

public final class PrintResult<T> extends OnSuccess<T> {
    
    @Override
    public final void onSuccess(T t) {

        System.out.println("PrintResults says: Total pause was for " + ((Long) t)
                + " milliseconds");
    }
}

Output:

Akka Futures says: Adding futures for two random length pauses
RandomPause@55e859c0 will pause for 3892 milliseconds
RandomPause@5430d082 will pause for 2306 milliseconds

Akka Futures says: There are 2 RandomPause's currently running
RandomPause@5430d082 was paused for 2306 milliseconds
RandomPause@55e859c0 was paused for 3892 milliseconds

PrintResults says: Total pause was for 6198 milliseconds
Note: Akka Actors are not used in this example.