Wednesday, March 18, 2015

Java Concurrency Problem: Accumulator to accumulate results of multiple threads.

I recently encountered one very interesting problem related to Java Concurrency. I will try to explain the problem in brief. There can be many threads executing in parallel and each will be computing some result but the output will be one of the two possible values: A and B (say). Now we want to keep these two values along with the number of threads that produced this value in a map. For example suppose there were 1000 threads and after computations 400 threads produced value A and 600 produced value B then the map should have: {A,600} and {B,400}.


The solution was refined in various iterations. Lets start with the very first solution which I will name as AccumulatorOne.

public class AccumulatorOne {
 
 private static final String[] NAMES = { "A", "B" };
 private static final int NB_THREADS = 1_000;
 private final Map<String, Integer> countsMap = new HashMap<>();
 private static final Lock readWriteLock = new ReentrantLock(true);

 public void testIt() {
  ExecutorService executor = Executors.newFixedThreadPool(NB_THREADS);
  for (int i = 0; i < NB_THREADS; i++) {
   Runnable task = new WorkerThread();
   executor.submit(task);
  }
  executor.shutdown();
  try {
   executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }
  System.out.println(countsMap);
 }

 private void accumulate(String name) {
  readWriteLock.lock();
  try {
   Integer cnt = countsMap.get(name);
   if (cnt == null) {
    countsMap.put(name, 1);
   } else {
    countsMap.put(name, cnt + 1);
   }
  } finally {
   readWriteLock.unlock();
  }
 }

 private class WorkerThread implements Runnable {
  @Override
  public void run() {
   accumulate(NAMES[ThreadLocalRandom.current().nextInt(0, NAMES.length)]);
  }
 }
}

This can be called/tested using the following class:

public class AccumulatorMainApp {
 public static void main(String[] args) {
  AccumulatorOne accumulator = new AccumulatorOne();
  accumulator.testIt();
 }
}

Now if we observe the problem carefully we notice few things:
  • We probably do not need any lock as we can make use of ConcurrentHashMap.
  • We can use AtomicInteger to keep track of count for values A and B.
  • We can also use method computeIfAbsent of ConcurrentHashMap

The second version is AccumulatorTwo as:

public class AccumulatorTwo {

 private static final String[] NAMES = { "A", "B" };
 private static final int NB_THREADS = 1000;
 private final Map<String, AtomicInteger> countsMap = new ConcurrentHashMap<>();

 public void testIt() {
  ExecutorService executor = Executors.newFixedThreadPool(NB_THREADS);
  for (int i = 0; i < NB_THREADS; i++) {
   Runnable task = new WorkerThread();
   executor.submit(task);
  }
  executor.shutdown();
  try {
   executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }
  System.out.println(countsMap);
 }

 private void accumulate(String name) {
  countsMap.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet();
 }

 private class WorkerThread implements Runnable {
  @Override
  public void run() {
   accumulate(NAMES[ThreadLocalRandom.current().nextInt(0, NAMES.length)]);
  }
 }
}

The code seems pretty neat. Can we improve it further? Yes. It seems that the LongAdder is the better candidate in case of high contention as mentioned in Java Doc:
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
Then we get third and final version of it as:

public class AccumulatorThree {
 private static final String[] NAMES = { "A", "B" };
 private static final int NB_THREADS = 1000;
 private final Map<String, LongAdder> countsMap = new ConcurrentHashMap<>();

 public void testIt() {
  ExecutorService executor = Executors.newFixedThreadPool(NB_THREADS);
  for (int i = 0; i < NB_THREADS; i++) {
   Runnable task = new WorkerThread();
   executor.submit(task);
  }
  executor.shutdown();
  try {
   executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }
  System.out.println(countsMap);
 }

 private void accumulate(String name) {
  countsMap.computeIfAbsent(name, k -> new LongAdder()).increment();
 }

 private class WorkerThread implements Runnable {
  @Override
  public void run() {
   accumulate(NAMES[ThreadLocalRandom.current().nextInt(0, NAMES.length)]);
  }
 }
}

And it can be used as shown below:
public class AccumulatorMainApp {
 public static void main(String[] args) {
  AccumulatorThree accumulatorThree = new AccumulatorThree();
  accumulatorThree.testIt();
 }
}

Let me know your comments/feedback. I hope you enjoyed this post.

No comments: