Thursday, March 12, 2015

Computing execution time using CountDownLatch and CyclicBarrier

A CountDownLatch is synchronizer that allows one or more threads to wait until a set of operations being performed in other threads completes. A CyclicBarrier is a synchronizer that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

I first encountered timer utility to compute time using CountdownLatch in amazing book  "Concurrency in Practice" by Brian Goetz.
public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
        throws InterruptedException {

        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {

                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) { }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end-start;
    }
}




Using the similar idea, I have written a utility class which can compute time taken when a code is making use of CyclicBarrier or CountDownLatch. The utility class has two methods: elapsedTimeUsingCountDownLatch and elapsedTimeUsingCyclicBarrier as shown below:
public class ConcurrentExecutionActionTimer {
 /***
  * This method captures the time taken by all worker threads to execute. The executor that is passed to the time must allow for the creation of   at least as many threads as the
  * given concurrency level or the test will never complete. This is known as thread starvation.
  * @param executor to execute the action
  * @param concurrency level representing the number of actions to be executed concurrently
  * @param action runnable representing the action.
  * @return time taken
  * @throws InterruptedException
  */
 public static long elapsedTimeUsingCountDownLatch(Executor executor, int concurrency, final Runnable action) throws InterruptedException
 {
  final CountDownLatch ready = new CountDownLatch(concurrency);
  final CountDownLatch start = new CountDownLatch(1);
  final CountDownLatch done = new CountDownLatch(concurrency);
  
  for(int i=0; i<concurrency; i++ ){
   executor.execute(new Runnable() {
    
    @Override
    public void run() {
     ready.countDown(); //Tell timer we are ready.
     
     try {
      start.await(); //Wait till peers are ready.
      action.run();
     } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
     } finally {
      done.countDown(); //Tell timer we are done.
     }
    }
   });
  }
  
  ready.await();  //Wait for all workers to be ready
  long startNanoTime = System.nanoTime();
  start.countDown();  //And here they go!!
  done.await();   // Wait for all workers to finish.
  return System.nanoTime() - startNanoTime;
  
 }
 
public static long elapsedTimeUsingCyclicBarrier(Executor executor, int concurrency, final Runnable action) throws InterruptedException, BrokenBarrierException
 {
  final Runnable barrierAction = new Runnable() {
   @Override
   public void run() {
    System.out.println("Condition of barrier is met.");
   }
  };
  
  final CyclicBarrier barrier = new CyclicBarrier(concurrency + 1, barrierAction);
  
  for(int i=0; i<concurrency; i++ ){
   executor.execute(new Runnable() {
    @Override
    public void run() {
     try {
      System.out.println("Waiting at barrier.");
      barrier.await();
      action.run();
      //Cyclic barrier gets reset automatically. Again wait for them to finish.
      barrier.await();
     } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
     } catch (BrokenBarrierException e) {
      e.printStackTrace();
     } 
    }
   });
  }
  barrier.await();
  long startNanoTime = System.nanoTime();
  barrier.await();
  return System.nanoTime() - startNanoTime;
 }




Now we can use the above utility class as follows:
class Worker implements Runnable {
 @Override
 public void run() {
  System.out.println("Doing work.");
  for(int i=0; i<20; i++) {
   try {
    Thread.sleep(500);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
  System.out.println("Finished.");
 }
}

public class TimerExample {
 public static void main(String[] args) {
  //Executor is replacement for common thread idiom: (new Thread(r)).start() to e.execute(r)
  ExecutorService executor = Executors.newFixedThreadPool(10);
  Worker action = new Worker();
  int concurrency = 5;
  try {
   //long elapsedTime = ConcurrentExecutionActionTimer.elapsedTimeUsingCountDownLatch(executor, concurrency, action);
   long elapsedTime = ConcurrentExecutionActionTimer.elapsedTimeUsingCyclicBarrier(executor, concurrency, action);
   double seconds = (double)elapsedTime / 1000000000.0;
   System.out.println("Time Taken approximately: " + seconds + "seconds.");
  } catch (InterruptedException | BrokenBarrierException e) {
   e.printStackTrace();
  }
 }
}

That is it. I hope you enjoyed this post.

No comments: