Tuesday, March 15, 2011

Concurrent HashMap and CopyOnWriteArrayList


What is ConcurrentModificationException?

In Java ConcurrentModificationException comes when a collection data set is being iterated in multiple threads and at least one of the thread tries to modify the data set stored in the collection.

Lets take an example of ArrayList.There are two threads which are using this shared ArrayList.One of the thread is just iterating the collection and printing it out.Another thread is iterating the collection and deleting the data if it matches with the value passed.

As a result of which we get unchecked exception- ConcurrentModificationException at runtime and program terminates.

ConnModificationExample.java
package com.kunaal.concurrent.list;

import java.util.ArrayList;
import java.util.List;

/**
 * This example starts two threads iterating over a list 
 * One of the thread tries to remove a value 
 * Since both threads share the same data structure we 
 * end up having ConcurrentModification Exception
 *  
 * @author Kunaal A Trehan
 *
 */
public class ConnModificationExample {

 /**
  * @param args
  */
 public static void main(String[] args) {
  List<String> dataList=initializeList();
  DelRunImpl delImpl=new DelRunImpl(dataList, "Grapes");
  TraverseRunImpl traverseImpl=new TraverseRunImpl(dataList);

  Thread traverseThread=new Thread(traverseImpl);
  Thread delThread=new Thread(delImpl);
  
  traverseThread.start();
  delThread.start();
 }
 
 /**
  * Utility method returning list containing fruit names
  * @return
  */
 private static List<String> initializeList(){
  List<String> dataList=new ArrayList<String>();
  dataList.add("Apple");
  dataList.add("Banana");
  dataList.add("Water Melon");
  dataList.add("Musk Melon");
  dataList.add("Grapes");
  dataList.add("Strawberry");
  dataList.add("Mango");
  
  return dataList;
 }

}

DelRunImpl.java
package com.kunaal.concurrent.list;

import java.util.Iterator;
import java.util.List;

/**
 * Run implementation trying to delete data from the 
 * shared data list.
 * 
 * @author Kunaal A Trehan
 *
 */
public class DelRunImpl implements Runnable{

 private List<String> dataList;
 private String delData; 
 
 /**
  * Parameterized constructor containing datalist and date to be deleted.
  * 
  * @param dataList
  * @param dataToDel
  */
 public DelRunImpl(List<String> dataList,String dataToDel) {
  this.dataList=dataList;
  this.delData=dataToDel;
 }

 /**
  * Overridden run implementation where we iterate through the 
  * list and check the contents 
  * If it matches with the data to be deleted 
  * Then current thread sleeps for 1000 milli seconds
  * before deleting the value
  */
 @Override
 public void run() {
  Iterator<String> iterator = dataList.iterator();
  
  while(iterator.hasNext()){
   String nextVal = iterator.next();
   
   if(nextVal.equals(delData)){
    try {
     Thread.currentThread().sleep(1000);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    iterator.remove();
   }
   
  }
    
 }

}


TraverseRunImpl.java
package com.kunaal.concurrent.list;

import java.util.Iterator;
import java.util.List;

/**
 * Run implementation in which we traverse through  the elements
 * of the array list and print it out .
 * 
 * @author Kunaal A Trehan
 *
 */
public class TraverseRunImpl implements Runnable {

 private List<String> dataList;
 
 /**
  * Parameterized constructor 
  */
 public TraverseRunImpl(List<String> dataList) {
  this.dataList=dataList;  
 }

 /**
  * Overriden run method in which we iterate to next element 
  * In between thread sleeps for 1000 milli seconds
  */
 @Override
 public void run() {
  Iterator<String> iterator = dataList.iterator();
  
  while(iterator.hasNext()){
   System.out.println("Element is-"+ iterator.next());
   try {
    Thread.currentThread().sleep(1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }

 }

}

Data Output
Element is-Apple
Exception in thread "Thread-0" java.util.ConcurrentModificationException
 at java.util.AbstractList$Itr.checkForComodification(Unknown Source)
 at java.util.AbstractList$Itr.next(Unknown Source)
 at com.kunaal.concurrent.list.TraverseRunImpl.run(TraverseRunImpl.java:33)
 at java.lang.Thread.run(Unknown Source)

So it shows that List does not handle the situation gracefully when more than one thread is using the dataset and modifying the underlying collection.
We could use synchronization to handle such scenarios.However its not the ideal solution in most of the cases.Imagine a production system where we have frequent listing and modification routines the system will choke if we use synchronization.

So Java 1.5+ introduced ConcurrentHashMap,CopyOnWriteArrayList and others to make the lock more finer and reduce this contention.We will go through this in detail in further sections.

ConcurrentHashMap

Normally when we acquire a lock on Hash Map full data set is locked.No other thread which requires exclusive lock for any operation can start even though different threads need to work in different regions.Still access to data set will be sequential one by one.
    Hash Map creates buckets/regions to store the data.Java states that if two objects are equal,their hash code must be same.However if hash codes are same,it does not mean that objects are equal .It may or may not be equal.

    ConcurrentHashMap on the contrary does not use one lock for the full data set.It uses fine grained approach and uses lock per region/bucket to allow following operations

    • Multiple reads can execute concurrently.
    • Simultaneous reads and writes can execute concurrently.
    • Multiple simultaneous writes can execute concurrently as long as writes are happening in different buckets/regions
      ConcurrentHashMap, along with the other concurrent collections, further improve on the synchronized collection classes by providing iterators that do not throw ConcurrentModificationException, thus eliminating the need to lock the collection during iteration.


      When we have different threads iterating/modifying/adding/..... the data set.Concurrent Hash Map does not throw ConcurrentModificationException.However it does not loose the modification.But whether it would be reflected in the other thread or not depends upon the state of the other thread which region it is traversing and others.

      To understand it better lets go through few examples.

      Example :- In this example I have a ConcurrentHashMap having Country Name as Key and population as the value.I have created two threads,one is iterating the map and other thread adds the data element to it.If we tweak the sleep time interval in the thread which adds the data element,other thread which is traversing the data set,sees the new entry.It all depends whether the add operation has finished by the time other thread has reached that bucket or not.

      TraverseInfoRunImpl.java
      package com.kunaal.concurrent.map;
      
      import java.util.Iterator;
      import java.util.concurrent.ConcurrentMap;
      
      /**
       * Run implementation printing data set information.
       * 
       * @author Kunaal A Trehan
       *
       */
      public class TraverseInfoRunImpl implements Runnable{
       
       private ConcurrentMap<String,Long>  dataMap;
      
       @Override
       public void run() {
        Iterator<String>  iterator = dataMap.keySet().iterator();
        
        while(iterator.hasNext()){
         String ctryName = iterator.next();
         System.out.println(ctryName + " has population of " + dataMap.get(ctryName));
         try {
          Thread.currentThread().sleep(1000);
         } catch (InterruptedException e) {
          e.printStackTrace();
         }
        }
       }
      
       /**
        * @param dataMap
        */
       public TraverseInfoRunImpl(ConcurrentMap<String,Long>  dataMap) {
        this.dataMap = dataMap;
       }
      
      }
      


      AddInfoRunImpl.java
      package com.kunaal.concurrent.map;
      
      import java.util.concurrent.ConcurrentMap;
      
      /**
       * Run implementation adding new data element to 
       * concurrent hash map.
       * 
       * @author Kunaal A Trehan
       *
       */
      public class AddInfoRunImpl implements Runnable {
       
       private ConcurrentMap<String,Long>  dataMap;
      
       /**
        * Parameterized constructor
        * @param dataMap
        */
       public AddInfoRunImpl(ConcurrentMap<String,Long>  dataMap) {
        this.dataMap = dataMap;
       }
      
       /**
        * Over ridden run() method
        */
       @Override
       public void run() {
        try {
         Thread.currentThread().sleep(1000);//Newly added entry gets reflected in other threadd
         //Thread.currentThread().sleep(5000);//Newly added entry does not get reflected in other thread
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
        dataMap.putIfAbsent("Iceland", new Long(100000 ));
        System.out.println("Iceland added");
       }
      
      }
      

      ConcurrentMapExample.java
      package com.kunaal.concurrent.map;
      
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.ConcurrentMap;
      
      /**
       * Concurrent HashMap example in which there are two threads
       * In one thread we are iterating the data set 
       * In another thread we are adding the data
       * 
       * By tweaking the sleep interval in AddInfoRunImpl,
       * newly added data element can be reflected in other concurrently 
       * executing thread.
       * @author Kunaal A Trehan
       *
       */
      public class ConcurrentMapExample {
      
       /**
        * @param args
        */
       public static void main(String[] args) {
        ConcurrentMap<String,Long> dataMap=createDataMap();
        AddInfoRunImpl addImpl=new AddInfoRunImpl(dataMap);
        TraverseInfoRunImpl traverseImpl=new TraverseInfoRunImpl(dataMap);
        
        Thread traverseThread=new Thread(traverseImpl);
        Thread addThread=new Thread(addImpl);
        
        traverseThread.start();
        addThread.start();
       }
      
      
       /**
        * Utility method returning concurrent map containing 
        * information -name of the city and corresponding population.
        * 
        * @return
        */
       private static ConcurrentMap<String,Long> createDataMap(){
        ConcurrentMap<String,Long> dataMap=new ConcurrentHashMap<String, Long>();
        dataMap.putIfAbsent("NYC", new Long(20000000));
        dataMap.putIfAbsent("SFO", new Long(17500000));
        dataMap.putIfAbsent("Chicago", new Long(16500000));
        dataMap.putIfAbsent("London", new Long(24000000));
        dataMap.putIfAbsent("Tokyo", new Long(34500000));
        dataMap.putIfAbsent("New Delhi", new Long(10000000));
        dataMap.putIfAbsent("Bangalore", new Long(9500000));
        dataMap.putIfAbsent("Lisbon", new Long(6500000));
        
        return dataMap;
       }
      }
      

      Output when add thread slept for 1000 ms.In this case other thread was able to see the new entity added
      Bangalore has population of 9500000
      Iceland added
      Tokyo has population of 34500000
      SFO has population of 17500000
      Lisbon has population of 6500000
      London has population of 24000000
      Iceland has population of 100000
      NYC has population of 20000000
      New Delhi has population of 10000000
      Chicago has population of 16500000
      

      Output when add thread slept for 5000 ms.In this case other thread was not able to see the new entity added
      Bangalore has population of 9500000
      Tokyo has population of 34500000
      SFO has population of 17500000
      Lisbon has population of 6500000
      London has population of 24000000
      NYC has population of 20000000
      Iceland added
      New Delhi has population of 10000000
      Chicago has population of 16500000
      


      Example :- In this example I have a ConcurrentHashMap having Country Name as Key and population as the value.I have created two threads,one is iterating the map and other thread deletes a particular data element.If we tweak the sleep time interval in the thread which deletes the data element,other thread which is traversing the data set,may not see the deleted entry.It all depends whether the delete operation has finished by the time other thread has reached that bucket or not.

      ConcurrentMapDelExample.java
      package com.kunaal.concurrent.map;
      
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.ConcurrentMap;
      
      /**
       * Concurrent HashMap example in which there are two threads
       * In one thread we are iterating the data set 
       * In another thread we are deleting the data
       * 
       * @author Kunaal A Trehan
       */
      public class ConcurrentMapDelExample {
      
       /**
        * @param args
        */
       public static void main(String[] args) {
        ConcurrentMap<String,Long> dataMap=createDataMap();
        DelRunImpl delImpl=new DelRunImpl(dataMap,"Lisbon");
        TraverseInfoRunImpl traverseImpl=new TraverseInfoRunImpl(dataMap);
        
        Thread traverseThread=new Thread(traverseImpl);
        Thread delThread=new Thread(delImpl);
        
        traverseThread.start();
        delThread.start();
       }
      
      
       /**
        * Utility method returning concurrent map containing 
        * information -name of the city and corresponding population.
        * 
        * @return
        */
       private static ConcurrentMap<String,Long> createDataMap(){
        ConcurrentMap<String,Long> dataMap=new ConcurrentHashMap<String, Long>();
        dataMap.putIfAbsent("NYC", new Long(20000000));
        dataMap.putIfAbsent("SFO", new Long(17500000));
        dataMap.putIfAbsent("Chicago", new Long(16500000));
        dataMap.putIfAbsent("London", new Long(24000000));
        dataMap.putIfAbsent("Tokyo", new Long(34500000));
        dataMap.putIfAbsent("New Delhi", new Long(10000000));
        dataMap.putIfAbsent("Bangalore", new Long(9500000));
        dataMap.putIfAbsent("Lisbon", new Long(6500000));
        
        return dataMap;
       }
      
      }
      

      DelRunImpl.java
      package com.kunaal.concurrent.map;
      
      import java.util.Iterator;
      import java.util.concurrent.ConcurrentMap;
      
      /**
       * Run implementation deleting a particular key.
       * 
       * @author Kunaal A Trehan
       *
       */
      public class DelRunImpl implements Runnable {
      
       private ConcurrentMap<String,Long>  dataMap;
       
       private String keyToBeDel;
       
       /**
        * @param dataMap
        * @param keyToBeDel
        */
       public DelRunImpl(ConcurrentMap<String, Long> dataMap, String keyToBeDel) {
        this.dataMap = dataMap;
        this.keyToBeDel = keyToBeDel;
       }
      
       @Override
       public void run() {
        Iterator<String> iterator = dataMap.keySet().iterator();
        
        while(iterator.hasNext()){
         String next = iterator.next();
         
         if(next.equals(keyToBeDel)){
          try {
           Thread.currentThread().sleep(1000);
          } catch (InterruptedException e) {
           e.printStackTrace();
          }
          dataMap.remove(next);
          break;
         }   
        }
      
       }
      
      }
      

      Output showing the deleted entry does get reflected when other thread is traversing the data set
      Bangalore has population of 9500000
      Tokyo has population of 34500000
      Data corresponding to key-Lisbon deleted from map
      SFO has population of 17500000
      London has population of 24000000
      NYC has population of 20000000
      New Delhi has population of 10000000
      Chicago has population of 16500000
      


      Some important points regarding ConcurrentHashMap
      • Iterators provided by ConcurrentHashMap does not throw ConcurrentModificationException,eliminating the need to lock the collection while iterating the data set. 
      • Since ConcurrentHashMap uses fine grained locks instead of single lock it allows multiple write operations to happen concurrently as long it effects different regions of concurrent hash map.
      • ConcurrentHashMap provides utility methods for compound actions like putIfAbsent,conditional remove and conditional replace.
      • Read operation uses lock only in one case when value is null.This can occur when object is added while initialization is still underway.
      CopyOnWriteArrayList

      This is new class added  in java.util.concurrent package for supporting concurrent threads execution.Its an alternative to synchronized list implementation.

      Whenever any thread modifies the data set a new list  is created and thread updates the newly created list.Any active iterators still points to old copy of the list.Later on all references will point to this newly created list.

      Like ConcurrentHashMap,CopyOnWriteArrayList also does not throw ConcurrentModificationException. Iterators created by a CopyOnWriteArrayList object cannot change the underlying array. Though these iterators do have methods for changing the underlying collection, their implementations throw an UnsupportedOperationException instead of modifying the underlying collection.

      To understand it better lets go through few examples.

      Example: -  In this example I have created CopyOnWriteArrayList containing names of fruits.I have created two threads,one is traversing the list using the iterator.Other thread is tries to delete the entry from the list using iterator.Since iterator returned by CopyOnWriteArrayList does not allow any modification.It throws UnsupportedOperationException.


      TraverseRunImpl.java

      package com.kunaal.concurrent.list;
      
      import java.util.Iterator;
      import java.util.List;
      
      /**
       * Run implementation in which we traverse through  the elements
       * of the array list and print it out .
       * 
       * @author Kunaal A Trehan
       *
       */
      public class TraverseRunImpl implements Runnable {
      
       private List<String> dataList;
       
       /**
        * Parameterized constructor 
        */
       public TraverseRunImpl(List<String> dataList) {
        this.dataList=dataList;  
       }
      
       /**
        * Overriden run method in which we iterate to next element 
        * In between thread sleeps for 1000 milli seconds
        */
       @Override
       public void run() {
        Iterator<String> iterator = dataList.iterator();
        
        while(iterator.hasNext()){
         System.out.println("Element is-"+ iterator.next());
         try {
          Thread.currentThread().sleep(1000);
         } catch (InterruptedException e) {
          e.printStackTrace();
         }
        }
      
       }
      
      }
      

      DelRunImpl.java
      package com.kunaal.concurrent.list;
      
      import java.util.Iterator;
      import java.util.List;
      
      /**
       * Run implementation trying to delete data from the 
       * shared data list.
       * 
       * @author Kunaal A Trehan
       *
       */
      public class DelRunImpl implements Runnable{
      
       private List<String> dataList;
       private String delData; 
       
       /**
        * Parameterized constructor containing datalist and date to be deleted.
        * 
        * @param dataList
        * @param dataToDel
        */
       public DelRunImpl(List<String> dataList,String dataToDel) {
        this.dataList=dataList;
        this.delData=dataToDel;
       }
      
       /**
        * Overridden run implementation where we iterate through the 
        * list and check the contents 
        * If it matches with the data to be deleted 
        * Then current thread sleeps for 1000 milli seconds
        * before deleting the value
        */
       @Override
       public void run() {
        Iterator<String> iterator = dataList.iterator();
        
        while(iterator.hasNext()){
         String nextVal = iterator.next();
         
         if(nextVal.equals(delData)){
          try {
           Thread.currentThread().sleep(1000);
          } catch (InterruptedException e) {
           e.printStackTrace();
          }
          iterator.remove();
         }
         
        }
          
       }
      
      }
      

      AddRemoveExample.java
      package com.kunaal.concurrent.list;
      
      import java.util.List;
      import java.util.concurrent.CopyOnWriteArrayList;
      
      /**
       * This example uses CopyOnWriteArraylist
       * Two thread will be traversing the list 
       * and one of the thread will try to remove some elements
       * 
       * @author Kunaal A Trehan
       *
       */
      public class AddRemoveExample {
      
       /**
        * @param args
        */
       public static void main(String[] args) {
        List<String> dataList=initializeList();
        DelRunImpl delRunImpl=new DelRunImpl(dataList, "Grapes");  
        TraverseRunImpl traverseRunImpl=new TraverseRunImpl(dataList);
        
        Thread traverseThread=new Thread(traverseRunImpl,"TraverseThread");
        Thread delThread=new Thread(delRunImpl,"DeleteThread");
        
        traverseThread.start();
        delThread.start();  
       }
       
       /**
        * Utility method returning list containing fruit names
        * @return
        */
       private static List<String> initializeList(){
        List<String> dataList=new CopyOnWriteArrayList<String>();
        dataList.add("Apple");
        dataList.add("Banana");
        dataList.add("Water Melon");
        dataList.add("Musk Melon");
        dataList.add("Grapes");
        dataList.add("Strawberry");
        dataList.add("Mango");
        
        return dataList;
       }
      
      }
      

      Output showing UnsupportedOperationException for DeleteThread.However other thread keeps on running as if nothing happened.
      Element is-Apple
      Exception in thread "DeleteThread" java.lang.UnsupportedOperationException
       at java.util.concurrent.CopyOnWriteArrayList$COWIterator.remove(Unknown Source)
       at com.kunaal.concurrent.list.DelRunImpl.run(DelRunImpl.java:49)
       at java.lang.Thread.run(Unknown Source)
      Element is-Banana
      Element is-Water Melon
      Element is-Musk Melon
      Element is-Grapes
      Element is-Strawberry
      Element is-Mango
      

      Example:- In this example I have created two threads.One thread is traversing the list.Other list adds an entry to the underlying data list.Once both the threads finishes the work.List is iterated again in the main thread showing that newly added entry is reflected and now all references point to newly created list.

      ConnListExample.java
      package com.kunaal.concurrent.list;
      
      import java.util.Iterator;
      import java.util.List;
      import java.util.concurrent.CopyOnWriteArrayList;
      
      /**
       * This example uses CopyOnWriteArrayList
       * Here one thread iterates over the list 
       * and another thread tries to add on element .
       * 
       * @author Kunaal A Trehan
       *
       */
      public class ConnListExample {
      
       /**
        * @param args
        */
       public static void main(String[] args) {
        List<String> dataList=initializeList();
        TraverseRunImpl traverseImpl=new TraverseRunImpl(dataList);
        AddRunImpl addRunImpl=new AddRunImpl(dataList);
      
        Thread traverseThread=new Thread(traverseImpl,"TraverseThread");
        Thread addThread=new Thread(addRunImpl,"AddThread");
        
        traverseThread.start();
        addThread.start();
        
        try {
         traverseThread.join();
         addThread.join();
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
        
        System.out.println("--------------Results after thread finished work--------------");
        Iterator<String> itr=dataList.iterator();
        while(itr.hasNext()){
         System.out.println(itr.next());
        }
         
       }
       
       /**
        * Utility method returning list containing fruit names
        * @return
        */
       private static List<String> initializeList(){
        List<String> dataList=new CopyOnWriteArrayList<String>();
        dataList.add("Apple");
        dataList.add("Banana");
        dataList.add("Water Melon");
        dataList.add("Musk Melon");
        dataList.add("Grapes");
        dataList.add("Strawberry");
        dataList.add("Mango");
        
        return dataList;
       }
      
      }
      


      AddRunImpl.java
      package com.kunaal.concurrent.list;
      
      import java.util.List;
      
      /**
       * Run implementation trying to add data to the shared data list.
       * @author Kunaal A Trehan
       *
       */
      public class AddRunImpl implements Runnable{
       
       private List<String> dataList;
      
       /**
        * Overriden run method where we add one element in the data list.
        */
       @Override
       public void run() {
        try {
         Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
        dataList.add("Pineapple");
        System.out.println("Pineapple added by "+ Thread.currentThread().getName());
       }
      
       /**
        * @param dataList
        */
       public AddRunImpl(List<String> dataList) {
        super();
        this.dataList = dataList;
       }
      
      }
      

      TraverseRunImpl.java
      package com.kunaal.concurrent.list;
      
      import java.util.Iterator;
      import java.util.List;
      
      /**
       * Run implementation in which we traverse through  the elements
       * of the array list and print it out .
       * 
       * @author Kunaal A Trehan
       *
       */
      public class TraverseRunImpl implements Runnable {
      
       private List<String> dataList;
       
       /**
        * Parameterized constructor 
        */
       public TraverseRunImpl(List<String> dataList) {
        this.dataList=dataList;  
       }
      
       /**
        * Overriden run method in which we iterate to next element 
        * In between thread sleeps for 1000 milli seconds
        */
       @Override
       public void run() {
        Iterator<String> iterator = dataList.iterator();
        
        while(iterator.hasNext()){
         System.out.println("Element is-"+ iterator.next());
         try {
          Thread.currentThread().sleep(1000);
         } catch (InterruptedException e) {
          e.printStackTrace();
         }
        }
      
       }
      
      }
      

      Output showing all references are updated to newly created list having 'Pineapple'.
      Element is-Apple
      Element is-Banana
      Pineapple added by AddThread
      Element is-Water Melon
      Element is-Musk Melon
      Element is-Grapes
      Element is-Strawberry
      Element is-Mango
      --------------Results after thread finished work--------------
      Apple
      Banana
      Water Melon
      Musk Melon
      Grapes
      Strawberry
      Mango
      Pineapple
      

      Some important points regarding CopyOnWriteArrayList
      • Iterators returned by CopyOnWriteArrayList does not allow any modification of the date set.
        It will throw UnsupportedOperationException.
      • Whenever any modification on the data set is performed by any thread.A new copy of the list is created and modification is performed in that.Later on all object references synch up to the newly created list.







      Tuesday, March 1, 2011

      Callable,Futures,Executors and others...

      In this posting we will go through how unbounded thread creation lead us to designing thread pool which is enhanced further by java.util.concurrent package  by providing Executor,ExecutorService,Callable ,Futures and others.

      Problem with unbounded thread creation

      We all would have observed that while creating threads we rarely think about thread creation,managebility and others.Most of the time our energy or thought process goes towards synchronization and preventing concurrent modification of state data.

      Imagine a use case where you have created a task where as soon as salary comes in your bank account,you want to pay your electricity bills,car emi bills,home loan and others.One way is to serially execute the payment transactions one by one.Other way is to spawn thread for each activity so that there is no dependency and failure in one transaction should not effect others in the queue.So we could end up in code as follows:-

      PaymentProcessor.java  having methods for doing payment
      package com.kunaal.parallel.payment;
      
      import com.kunaal.parallel.domain.PaymentInfo;
      
      /**
       * Payment processor class which has logic for doing payment
       * Currently it just have sysout,can be plugged with implemenation 
       * for real scenarios 
       * @author Kunaal A Trehan
       *
       */
      public class PaymentProcessor {
       /**
        * Thread Local variable containing payment information details
        */
       private static ThreadLocal<PaymentInfo> PAYMENT_INFO=new ThreadLocal<PaymentInfo>(){
         protected PaymentInfo initialValue() {
                return new PaymentInfo();
            }
       };
       
       /**
        * Setter method for paymentInfo
        * @param pymtInfo
        */
       public void setPaymentInfo(PaymentInfo pymtInfo){
        PAYMENT_INFO.set(pymtInfo);
       }
       
       /**
        * Cleanup paymentInfo thread local variable
        */
       public void removePaymentInfo(){
        PAYMENT_INFO.remove();
       }
      
       /**
        * Dummy implementation for making payment
        */
       public void makePayment(){
        System.out.println("Payment details are:- "+ PAYMENT_INFO.get());
       }
       
       /**
        * Dummy implementation for making payment and getting acknowledgement 
        * for the same.
        * @return
        */
       public String makePaymentGetAck(){
        PaymentInfo paymentInfo = PAYMENT_INFO.get();
        StringBuffer buffer=new StringBuffer();
        buffer.append("ACK-")
         .append(paymentInfo.getAcctNum())
         .append("-")
         .append(paymentInfo.getAmt())
         .append("-")
         .append("Y");
        return buffer.toString();
       }
       
      
       /**
        * Default Constructor
        */
       public PaymentProcessor() {
        super();
       }
      }
      


      RunImpl.java setting threadLocal  variable containing payment information and invoking payment processor
      package com.kunaal.parallel.runnable;
      
      import com.kunaal.parallel.Constants;
      import com.kunaal.parallel.domain.PaymentInfo;
      import com.kunaal.parallel.payment.PaymentProcessor;
      
      /**
       * Run implementation for thread
       * @author Kunaal A Trehan
       *
       */
      public class RunImpl implements Runnable {
       
       private PaymentProcessor pymtProcessor;
      
       /* 
        * Run implementation which creates a payment information details
        * depending upon thread name.
        */
       @Override
       public void run() {
        String name = Thread.currentThread().getName();
        PaymentInfo pymtInfo=null;
        
        if (name.equals(Constants.CAR_LOAN_EMI)){
         pymtInfo=new PaymentInfo(new Double(650), "Acct-1", "Bank-1", "City-1");
        }else if (name.equals(Constants.ELEC_BILL)){
         pymtInfo=new PaymentInfo(new Double(50), "Acct-2", "Bank-2", "City-2");
        }else if (name.equals(Constants.EXECUTIVE_MBA)){
         pymtInfo=new PaymentInfo(new Double(150), "Acct-3", "Bank-3", "City-3");
        }else if (name.equals(Constants.HOME_LOAN_EMI)){
         pymtInfo=new PaymentInfo(new Double(1050), "Acct-4", "Bank-4", "City-4");
        }else if (name.equals(Constants.KID_SCHOOL_FEES)){
         pymtInfo=new PaymentInfo(new Double(70), "Acct-5", "Bank-5", "City-5");
        }else if (name.equals(Constants.WATER_BILL)){
         pymtInfo=new PaymentInfo(new Double(30), "Acct-6", "Bank-6", "City-6");
        }else if (name.equals(Constants.FUND_FOR_OLD_AGE_HOME)){
         pymtInfo=new PaymentInfo(new Double(20), "Acct-7", "Bank-7", "City-7");
        }else if (name.equals(Constants.FUND_FOR_ORPHANAGE)){
         pymtInfo=new PaymentInfo(new Double(30), "Acct-8", "Bank-8", "City-8");
        }else{
         System.out.println("Nothing to pay-no info provided");
        }
        
        //Here we do thread local setting into payment processor
        //and after performing the activity 
        //We clear the thread local variable
        pymtProcessor.setPaymentInfo(pymtInfo);
        pymtProcessor.makePayment();
        pymtProcessor.removePaymentInfo();
        
        try {
         Thread.currentThread().sleep(10*1000);
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
       }
      
       /**
        * @param pymtProcessor
        */
       public RunImpl(PaymentProcessor pymtProcessor) {
        super();
        this.pymtProcessor = pymtProcessor;
       }
      
       /**
        * @return the pymtProcessor
        */
       public PaymentProcessor getPymtProcessor() {
        return pymtProcessor;
       }
      
       /**
        * @param pymtProcessor the pymtProcessor to set
        */
       public void setPymtProcessor(PaymentProcessor pymtProcessor) {
        this.pymtProcessor = pymtProcessor;
       }
      
      }
      

      ThreadExample.java spawning new thread for each payment instruction
      package com.kunaal.parallel;
      
      import com.kunaal.parallel.payment.PaymentProcessor;
      import com.kunaal.parallel.runnable.RunImpl;
      
      /**
       * Code demonstrating thread spawned per activity use case 
       * There is a thread spawned for paying 
       * -Car Loan 
       * -Electricity Bill
       * -Executive MBA
       * -Old Age home fund
       * -Orphange fund
       * -Home loan fund
       * -Kid school fees
       * -Water bill 
       * 
       * @author Kunaal A Trehan
       *
       */
      public class ThreadExample {
      
       /**
        * @param args
        */
       public static void main(String[] args) {
        PaymentProcessor pymtProcessor=new PaymentProcessor();
        RunImpl runImpl=new RunImpl(pymtProcessor);
        
        Thread carLoanThread=new Thread(runImpl,Constants.CAR_LOAN_EMI);
        Thread elecBillThread=new Thread(runImpl,Constants.ELEC_BILL);
        Thread executiveMbaThread=new Thread(runImpl,Constants.EXECUTIVE_MBA);
        Thread oldAgeHomeThread=new Thread(runImpl,Constants.FUND_FOR_OLD_AGE_HOME);
        Thread orphanageThread=new Thread(runImpl,Constants.FUND_FOR_ORPHANAGE);
        Thread homeLoanThread=new Thread(runImpl,Constants.HOME_LOAN_EMI);
        Thread kidFeesThread=new Thread(runImpl,Constants.KID_SCHOOL_FEES);
        Thread waterThread=new Thread(runImpl,Constants.WATER_BILL);
        
        long startTime = System.currentTimeMillis();
        System.out.println("Before starting the threads:-" + startTime);
        
        carLoanThread.start();
        elecBillThread.start();
        executiveMbaThread.start();
        oldAgeHomeThread.start();
        orphanageThread.start();
        homeLoanThread.start();
        kidFeesThread.start();
        waterThread.start();
        
        long timeAfterStarting = System.currentTimeMillis();
        
        System.out.println("After starting the threads:-" + timeAfterStarting);
        try {
         carLoanThread.join();
         elecBillThread.join();
         executiveMbaThread.join();
         oldAgeHomeThread.join();
         orphanageThread.join();
         homeLoanThread.join();
         kidFeesThread.join();
         waterThread.join();
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
        long timeEnd = System.currentTimeMillis();
        
        System.out.println("Time taken for threads to finish-"+ ((timeEnd-timeAfterStarting)/1000) + " seconds");
       }
      }
      

      Constants.java
      package com.kunaal.parallel;
      
      /**
       * Constants class
       * @author Kunaal A Trehan
       *
       */
      public class Constants {
       
       public static final String ELEC_BILL="Electricity Bill";
       
       public static final String WATER_BILL="Water Bill";
       
       public static final String KID_SCHOOL_FEES="Kid School Fees";
       
       public static final String HOME_LOAN_EMI="Home Loan EMI";
       
       public static final String CAR_LOAN_EMI="Car Loan EMI";
       
       public static final String EXECUTIVE_MBA="Executive MBA fees";
       
       public static final String FUND_FOR_ORPHANAGE="Orphanage fund";
       
       public static final String FUND_FOR_OLD_AGE_HOME="Old Age Home fund";
      
      }
      

      Results of ThreadExample.java
      Before starting the threads:-1298979143734
      After starting the threads:-1298979143734
      Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-8, bankName=Bank-8, city=City-8]
      Payment details are:- PaymentInfo [amt=150.0, acctNum=Acct-3, bankName=Bank-3, city=City-3]
      Payment details are:- PaymentInfo [amt=650.0, acctNum=Acct-1, bankName=Bank-1, city=City-1]
      Payment details are:- PaymentInfo [amt=50.0, acctNum=Acct-2, bankName=Bank-2, city=City-2]
      Payment details are:- PaymentInfo [amt=70.0, acctNum=Acct-5, bankName=Bank-5, city=City-5]
      Payment details are:- PaymentInfo [amt=1050.0, acctNum=Acct-4, bankName=Bank-4, city=City-4]
      Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-6, bankName=Bank-6, city=City-6]
      Payment details are:- PaymentInfo [amt=20.0, acctNum=Acct-7, bankName=Bank-7, city=City-7]
      Time taken for threads to finish-10 seconds
      


      Above example shows thread per activity,it looks nice.However it has following issues.These are:-
      • For each activity we are creating a thread.Creation of thread is expensive and lets suppose there are 100 activities we will end up having 100 threads.Context switching between them and manging so many threads could be problematic.
      • Main thread has to keep track of threads spawned and as we have shown in above example.We have to keep some mechanism(join as shown above) to make sure each and every thread has finished the job assigned to it before main thread exits.
      • Moreover lets suppose you want to fetch result from each activity which thread does and perform some computation on results .It would be quite difficult  and messy as run() method does not return any thing.We have to plug some code which may not be clean.

      ThreadPool
      So we have seen what unbounded thread creation can do and what are its drawbacks.So it takes us to threadpool.

      Threadpool is a collection of threads which are provided chunks of work to perform.Since threads are already initialized so thread creation is avoided.Moreover once the thread has finished the task assigned to it.It can pick other task from the queue.How many threads should be there in a thread pool depends upon the type of task and load that particular thread pool needs to cater.

      Prior to jdk1.5 developers used to code thread pool implementations.However with the introduction of java.util.concurrent package ,Java has  provided  utility classes for thread pool implementation which will be discussed in next section.

      Executor,ExecutorService and Executors

      Executor is an interface having one method for executing runnables.

      ExecutorService  is the sub interface of executor having methods for shutdown,submitting callable,invoking collection of callables and others.

      Executors is the utility class having factory methods for thread pool,executor service and others.

      Lets take the same use case involving threadpool.

       ThreadPoolExecutorEx.java using executors to create the thread pool and passing runnable implementations to the thread pool
      package com.kunaal.parallel;
      
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      import com.kunaal.parallel.payment.PaymentProcessor;
      import com.kunaal.parallel.runnable.ExecutorRunImpl;
      
      /**
       * @author Kunaal A Trehan
       *
       */
      public class ThreadPoolExecutorEx {
      
       /**
        * @param args
        */
       public static void main(String[] args) {
        
        ExecutorService threadPoolExecutor=Executors.newFixedThreadPool(4);
        PaymentProcessor pymtProcessor=new  PaymentProcessor();
        
        ExecutorRunImpl carLoanExecutor=new ExecutorRunImpl(Constants.CAR_LOAN_EMI, pymtProcessor);
        ExecutorRunImpl elecBillExecutor=new ExecutorRunImpl(Constants.ELEC_BILL, pymtProcessor);
        ExecutorRunImpl mbaExecutor=new ExecutorRunImpl(Constants.EXECUTIVE_MBA, pymtProcessor);
        ExecutorRunImpl oldAgeExecutor=new ExecutorRunImpl(Constants.FUND_FOR_OLD_AGE_HOME, pymtProcessor);
        ExecutorRunImpl orphanageExecutor=new ExecutorRunImpl(Constants.FUND_FOR_ORPHANAGE, pymtProcessor);
        ExecutorRunImpl homeLoanExecutor=new ExecutorRunImpl(Constants.HOME_LOAN_EMI, pymtProcessor);
        ExecutorRunImpl kidSchoolExecutor=new ExecutorRunImpl(Constants.KID_SCHOOL_FEES, pymtProcessor);
        ExecutorRunImpl waterBillExecutor=new ExecutorRunImpl(Constants.WATER_BILL, pymtProcessor);
        
        long startTime = System.currentTimeMillis();
        System.out.println("Before starting the threads:-" + startTime);
        
        threadPoolExecutor.execute(carLoanExecutor);
        threadPoolExecutor.execute(elecBillExecutor);
        threadPoolExecutor.execute(mbaExecutor);
        threadPoolExecutor.execute(oldAgeExecutor);
        threadPoolExecutor.execute(orphanageExecutor);
        threadPoolExecutor.execute(homeLoanExecutor);
        threadPoolExecutor.execute(kidSchoolExecutor);
        threadPoolExecutor.execute(waterBillExecutor);
        
        long timeAfterStarting = System.currentTimeMillis();  
        System.out.println("After starting the threads:-" + timeAfterStarting);
        
        threadPoolExecutor.shutdown();
        while(!threadPoolExecutor.isTerminated()){
         
        }
        long timeEnd = System.currentTimeMillis();  
        System.out.println("Time taken for threads to finish-"+ ((timeEnd-timeAfterStarting)/1000) + " seconds");
       }
      
      }
      


      ExecutorRunImpl.java overriding run() method
      package com.kunaal.parallel.runnable;
      
      import com.kunaal.parallel.Constants;
      import com.kunaal.parallel.domain.PaymentInfo;
      import com.kunaal.parallel.payment.PaymentProcessor;
      
      /**
       * @author Kunaal A Trehan
       *
       */
      public class ExecutorRunImpl implements Runnable {
       
       private String info;
       private PaymentProcessor pymtProcessor;
      
       /**
        * @param info
        */
       public ExecutorRunImpl(String info,PaymentProcessor pymtProcessor) {
        this.info = info;
        this.pymtProcessor=pymtProcessor;
       }
      
       /* (non-Javadoc)
        * @see java.lang.Runnable#run()
        */
       @Override
       public void run() {
        PaymentInfo pymtInfo=null;
        
        if (info.equals(Constants.CAR_LOAN_EMI)){
         pymtInfo=new PaymentInfo(new Double(650), "Acct-1", "Bank-1", "City-1");
        }else if (info.equals(Constants.ELEC_BILL)){
         pymtInfo=new PaymentInfo(new Double(50), "Acct-2", "Bank-2", "City-2");
        }else if (info.equals(Constants.EXECUTIVE_MBA)){
         pymtInfo=new PaymentInfo(new Double(150), "Acct-3", "Bank-3", "City-3");
        }else if (info.equals(Constants.HOME_LOAN_EMI)){
         pymtInfo=new PaymentInfo(new Double(1050), "Acct-4", "Bank-4", "City-4");
        }else if (info.equals(Constants.KID_SCHOOL_FEES)){
         pymtInfo=new PaymentInfo(new Double(70), "Acct-5", "Bank-5", "City-5");
        }else if (info.equals(Constants.WATER_BILL)){
         pymtInfo=new PaymentInfo(new Double(30), "Acct-6", "Bank-6", "City-6");
        }else if (info.equals(Constants.FUND_FOR_OLD_AGE_HOME)){
         pymtInfo=new PaymentInfo(new Double(20), "Acct-7", "Bank-7", "City-7");
        }else if (info.equals(Constants.FUND_FOR_ORPHANAGE)){
         pymtInfo=new PaymentInfo(new Double(30), "Acct-8", "Bank-8", "City-8");
        }else{
         System.out.println("Nothing to pay-no info provided");
        }
        
        pymtProcessor.setPaymentInfo(pymtInfo);
        pymtProcessor.makePayment();
        pymtProcessor.removePaymentInfo();
        
        try {
         Thread.currentThread().sleep(10*1000);
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
       }
      
       /**
        * @return the info
        */
       public String getInfo() {
        return info;
       }
      
       /**
        * @param info the info to set
        */
       public void setInfo(String info) {
        this.info = info;
       }
      
      }
      

      Results of ThreadPoolExecutorEx.java
      Before starting the threads:-1298978911765
      After starting the threads:-1298978911765
      Payment details are:- PaymentInfo [amt=650.0, acctNum=Acct-1, bankName=Bank-1, city=City-1]
      Payment details are:- PaymentInfo [amt=150.0, acctNum=Acct-3, bankName=Bank-3, city=City-3]
      Payment details are:- PaymentInfo [amt=50.0, acctNum=Acct-2, bankName=Bank-2, city=City-2]
      Payment details are:- PaymentInfo [amt=20.0, acctNum=Acct-7, bankName=Bank-7, city=City-7]
      Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-8, bankName=Bank-8, city=City-8]
      Payment details are:- PaymentInfo [amt=1050.0, acctNum=Acct-4, bankName=Bank-4, city=City-4]
      Payment details are:- PaymentInfo [amt=70.0, acctNum=Acct-5, bankName=Bank-5, city=City-5]
      Payment details are:- PaymentInfo [amt=30.0, acctNum=Acct-6, bankName=Bank-6, city=City-6]
      Time taken for threads to finish-20 seconds
      

      Though this implementation avoids creation of threads and uses the existing threads for other activity once job assigned to it is finished.It suffers from one issue.There is no easy way to track results from thread as run method does not have any return type.

      So it takes us to callables and futures.

      Before moving to callables and futures,lets see what all variants of thread pool implementation is provided by Executors.These are as follows:-
      • FixedThreadPool
        In this thread pool we define number of threads to be created at the startup.When we have tasks more than number of threads.Then other tasks wait in the queue till the time some thread is free to perform the task.

        In Executors we have utility methods like newFixedThreadPool(int noOfThreads) ,newFixedThreadPool(int nThreads, ThreadFactory threadFactory) for creating the same.
      • CachedThreadPool
        This thread pool  creates as many threads as required for parallel processing.However if there is a thread which is free,it will try to re use that thread rather than creating a new thread.However if a thread has not performed any activity for last 60 seconds,it is terminated and removed from the cache.

        In Executors we have utility methods like  newCachedThreadPool(ThreadFactory threadFactory), newCachedThreadPool() for creating the same.

      • SingleThreadPool
        This thread pool contains only one thread.

      • ScheduledThreadPool
        This threadpool schedule commands to run after a given delay, or to execute periodically.


      What is Callable and Future?
      Callable is similar to runnable interface except that it can return value.Future is used in conjunction with callable to store result of asynchronously run task.Callable was specifically introduced in jdk1.5 to provide alternate mechanism incase we want to return something from thread execution.


      So lets revisit the same use case of funds transfer,but this time we will use Callable and Futures to hold the acknowledgement.


      CallableImpl.java storing the return value from call() in the FutureList
      package com.kunaal.parallel.callable;
      
      import java.util.concurrent.Callable;
      
      import com.kunaal.parallel.Constants;
      import com.kunaal.parallel.domain.PaymentInfo;
      import com.kunaal.parallel.payment.PaymentProcessor;
      
      /**
       * Callable implementation with String as thr return value
       * String contains acknowledgement of the payment transaction.
       * 
       * @author Kunaal A Trehan
       *
       */
      public class CallableImpl implements Callable<String> {
       private String info;
       private PaymentProcessor pymtProcessor;
      
       /**
        * @param info
        */
       public CallableImpl(String info,PaymentProcessor pymtProcessor) {
        this.info = info;
        this.pymtProcessor=pymtProcessor;
       }
      
       /**
        * Over riden call method with String as return type
        */
       @Override
       public String call() throws Exception {
        PaymentInfo pymtInfo=null;
        String acknowledgement=null;
        
        if (info.equals(Constants.CAR_LOAN_EMI)){
         pymtInfo=new PaymentInfo(new Double(650), "Acct-1", "Bank-1", "City-1");
        }else if (info.equals(Constants.ELEC_BILL)){
         pymtInfo=new PaymentInfo(new Double(50), "Acct-2", "Bank-2", "City-2");
        }else if (info.equals(Constants.EXECUTIVE_MBA)){
         pymtInfo=new PaymentInfo(new Double(150), "Acct-3", "Bank-3", "City-3");
        }else if (info.equals(Constants.HOME_LOAN_EMI)){
         pymtInfo=new PaymentInfo(new Double(1050), "Acct-4", "Bank-4", "City-4");
        }else if (info.equals(Constants.KID_SCHOOL_FEES)){
         pymtInfo=new PaymentInfo(new Double(70), "Acct-5", "Bank-5", "City-5");
        }else if (info.equals(Constants.WATER_BILL)){
         pymtInfo=new PaymentInfo(new Double(30), "Acct-6", "Bank-6", "City-6");
        }else if (info.equals(Constants.FUND_FOR_OLD_AGE_HOME)){
         pymtInfo=new PaymentInfo(new Double(20), "Acct-7", "Bank-7", "City-7");
        }else if (info.equals(Constants.FUND_FOR_ORPHANAGE)){
         pymtInfo=new PaymentInfo(new Double(30), "Acct-8", "Bank-8", "City-8");
        }else{
         System.out.println("Nothing to pay-no info provided");
        }
        
        pymtProcessor.setPaymentInfo(pymtInfo);
        acknowledgement=pymtProcessor.makePaymentGetAck();
        pymtProcessor.removePaymentInfo();
        
        try {
         Thread.currentThread().sleep(10*1000);
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
      
        return acknowledgement;
       }
      
      }
      


      CallableExecutor.java using callable for the execution and future for displaying the acknowledgements
      package com.kunaal.parallel;
      
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.Future;
      
      import com.kunaal.parallel.callable.CallableImpl;
      import com.kunaal.parallel.payment.PaymentProcessor;
      
      /**
       * Usecase with callable and future implementation
       * Here we submit callable references in the thread pool 
       * which executes it and store the result in the future
       * 
       * @author Kunaal A Trehan
       *
       */
      public class CallableExecutor {
      
       /**
        * @param args
        * @throws ExecutionException 
        * @throws InterruptedException 
        */
       public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPoolExecutor=Executors.newFixedThreadPool(4);
        PaymentProcessor pymtProcessor=new  PaymentProcessor();
        List<Future<String>> futureList= new ArrayList<Future<String>>();
        
        CallableImpl carLoanExecutor=new CallableImpl(Constants.CAR_LOAN_EMI, pymtProcessor);
        CallableImpl elecBillExecutor=new CallableImpl(Constants.ELEC_BILL, pymtProcessor);
        CallableImpl mbaExecutor=new CallableImpl(Constants.EXECUTIVE_MBA, pymtProcessor);
        CallableImpl oldAgeExecutor=new CallableImpl(Constants.FUND_FOR_OLD_AGE_HOME, pymtProcessor);
        CallableImpl orphanageExecutor=new CallableImpl(Constants.FUND_FOR_ORPHANAGE, pymtProcessor);
        CallableImpl homeLoanExecutor=new CallableImpl(Constants.HOME_LOAN_EMI, pymtProcessor);
        CallableImpl kidSchoolExecutor=new CallableImpl(Constants.KID_SCHOOL_FEES, pymtProcessor);
        CallableImpl waterBillExecutor=new CallableImpl(Constants.WATER_BILL, pymtProcessor);
        
        futureList.add(threadPoolExecutor.submit(carLoanExecutor));
        futureList.add(threadPoolExecutor.submit(elecBillExecutor));
        futureList.add(threadPoolExecutor.submit(mbaExecutor));
        futureList.add(threadPoolExecutor.submit(oldAgeExecutor));
        futureList.add(threadPoolExecutor.submit(orphanageExecutor));
        futureList.add(threadPoolExecutor.submit(homeLoanExecutor));
        futureList.add(threadPoolExecutor.submit(kidSchoolExecutor));
        futureList.add(threadPoolExecutor.submit(waterBillExecutor));
        
        threadPoolExecutor.shutdown();
        
        for(Future<String> future:futureList){
         System.out.println(future.get());
        }
       }
      
      }
      

      CallableExecutor.java results
      ACK-Acct-1-650.0-Y
      ACK-Acct-2-50.0-Y
      ACK-Acct-3-150.0-Y
      ACK-Acct-7-20.0-Y
      ACK-Acct-8-30.0-Y
      ACK-Acct-4-1050.0-Y
      ACK-Acct-5-70.0-Y
      ACK-Acct-6-30.0-Y
      


      So in the above code we used callable and future to capture the results.However it can still be improved.But where is the problem.......

      Problem lies in the iterator of futureList which is used to print the acknowledgements.
      Imagine that the first task took good amount of time.By that time other tasks have finished executing.But still we can't see the results of those tasks unless first task gets completed.

      So what we should do,java has provided CompletionService which we will discuss in next section which overcomes this issue.

      CompletionService
      CompletionService is the interface provided by java.util.concurrent package which isolates producer from submitting the task to consumer who retrieves the result irrespective of the order in which task is submitted.Consumer takes the result in the order it is completed.



      ExecutorCompletionService is concrete implementation of CompletionService.

      CompletionExecutor.java showing implementation of CompletionService
      package com.kunaal.parallel;
      
      import java.util.concurrent.Callable;
      import java.util.concurrent.CompletionService;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ExecutorCompletionService;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      /**
       * CompletionExecutor showing the use of CompletionService
       * which adds the result as it comes from the thread rather than 
       * maintaining the order in which threads were spawned
       * 
       * @author Kunaal A Trehan
       *
       */
      public class CompletionExecutor {
      
       /**
        * @param args
        * @throws ExecutionException 
        * @throws InterruptedException 
        */
       public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPoolExecutor=Executors.newFixedThreadPool(4);
        CompletionService<String> completionService=new ExecutorCompletionService<String>(threadPoolExecutor);
        
        CallImpl impl1=new CallImpl(20);
        CallImpl impl2=new CallImpl(10);
        CallImpl impl3=new CallImpl(5);
        CallImpl impl4=new CallImpl(3);
        
        completionService.submit(impl1);
        completionService.submit(impl2);
        completionService.submit(impl3);
        completionService.submit(impl4);
        
        threadPoolExecutor.shutdown();
        
        for(int i=0;i <4;i++){
         String result = completionService.take().get();
         System.out.println(result);
        }
       }
      
      }
      
      /**
       * Inner class for Callable implementation 
       * where threads paused for n seconds before completing the 
       * execution block.
       * 
       * @author Kunaal A Trehan
       *
       */
      class CallImpl implements Callable<String>{
      
       private int waitTime;
       
       /**
        * @param waitTime
        */
       public CallImpl(int waitTime) {
        super();
        this.waitTime = waitTime;
       }
      
       @Override
       public String call() throws Exception {
        long startTime = System.currentTimeMillis();
        long endTime =startTime +(waitTime*1000);
        
        while(System.currentTimeMillis()<endTime){
         
        }
        return "Thread paused for " + waitTime + " seconds";
       }
       
      }
      


      Program console showing how completion service adds the result of the callables in the order it is completed
      Thread paused for 3 seconds
      Thread paused for 5 seconds
      Thread paused for 10 seconds
      Thread paused for 20 seconds