Tuesday, 7 July 2015

Inter-thread communication using wait, notify and notifyAll - Java Multithreading

Java provides inter-thread communication using the following methods of the Object class.

  • wait()
  • notify()
  • notifyAll()

These methods are implemented as final method in the Object class thus available to all the classes, as Object class is the super class of all the classes in Java.

Another important point about these methods is that they can only be called from a synchronized context, as these methods are about releasing the monitor and acquiring it again. Threads acquire monitor(lock) when entering a synchronized method (or block) so it makes sense to call them from synchronized context.

wait method

Wait method tells the current thread (thread which is executing code inside a synchronized method or block) to give up monitor and go to sleep, until another thread invokes the notify() or notifyAll() method for this object.

General form of wait method

public final void wait() throws InterruptedException

There are two more overloaded wait methods

public final void wait(long timeout) throws InterruptedException

Causes the current thread to wait until either another thread invokes the notify() method or the notifyAll() method for this object, or a specified amount of time has elapsed.

public final void wait(long timeout, int nanos) throws InterruptedException

Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed.

Notify method

Wakes up a single thread that is waiting on this object's monitor. If more than one threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation.

Note that the thread which comes out of waiting because of the notify() method will not be able to proceed until the current thread relinquishes the lock on this object. The awakened thread just changes to the runnable state and it is ready to be scheduled again. The awakened thread will compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for example, the awakened thread enjoys no reliable privilege or disadvantage in being the next thread to lock this object.

Once awakened thread(thread which has come out of waiting because of notify() method) has gained control of the object, all its synchronization claims on the object are restored to the situation as of the time that the wait method was invoked that is on return from the wait method, the synchronization state of the object and of thread is exactly as it was when the wait method was invoked.

notifyAll method

wakes up all the threads that called wait() on the same object. As explained in notify() any one of the threads will be granted access to the object.

Spurious wakeup

Once wait is called on an object the thread that is currently executing with in the synchronized context waits until notify() or notifyAll() method is called. But there is a possibility that a waiting thread resumes again even when notify() or notifyAll() are not called (this will rarely occur in practice). This is known as spurious wakeup.

To guard against spurious wakeup the recommendation is that call to wait() method should be with in a loop that checks the condition on which the thread is waiting.

synchronized (obj) {
    while (condition does not hold)
        obj.wait(timeout);
        ... // Perform action appropriate to condition
}

Read more about spurious wakeup in the wait method of the Java docs of Object class

http://docs.oracle.com/javase/8/docs/api/java/lang/Object.html

Producer-Consumer example using wait, notify

Let's see one of the oft mentioned example of producer and consumer implemented with two threads, where producer thread produces a number, puts it in a list and then consumer thread gets that number out of the list. Since a shared list is used between these two threads so to make sure that both threads work in tandem wait/notify mechanism is used for inter-thread communication.

class Producer implements Runnable{
    List<Integer> sharedListObj;
    Producer(List<Integer> sharedListObj){
        this.sharedListObj = sharedListObj;
    }
    @Override
    public void run() {
        int i = 0;
        while(true){
            synchronized (sharedListObj) {
                // While condition as mandated to avoid spurious wakeup
                while(sharedListObj.size() >= 1){
                    try {
                        sharedListObj.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                // Putting value in the list
                System.out.println("Adding to queue - " + Thread.currentThread().getName() + " " + ++i);
                sharedListObj.add(i);
                sharedListObj.notify();    
                // To get out of while(true) loop, putting
                // only 5 values
                if(i > 4) break;
            }
        }
    }            
}

class Consumer implements Runnable{
    List<Integer> sharedListObj;
    Consumer(List<Integer> sharedListObj){
        this.sharedListObj = sharedListObj;
    }
    @Override
    public void run() {    
        while(true){
            synchronized (sharedListObj) {
                while(sharedListObj.size() < 1){
                    try {
                        sharedListObj.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }                    
                }
                // Getting value from the list
                System.out.println("Getting from queue " + Thread.currentThread().getName() + " " + sharedListObj.get(0));
                // To get out of while(true) loop
                if(sharedListObj.get(0) == 5) break;
                sharedListObj.remove(0);
                sharedListObj.notify();        
                
            }
        }
    }
}


public class InterThreadDemo {
    public static void main(String[] args) {
        // This is the shared list shared between producer
        // and consumer
        List<Integer> sharedListObj = new ArrayList<Integer>();
        Thread t1 = new Thread(new Producer(sharedListObj), "Producer");
        Thread t2 = new Thread(new Consumer(sharedListObj), "Consumer");
        t1.start();
        t2.start();    
    }

}

Output

Adding to queue - Producer 1
Getting from queue Consumer 1
Adding to queue - Producer 2
Getting from queue Consumer 2
Adding to queue - Producer 3
Getting from queue Consumer 3
Adding to queue - Producer 4
Getting from queue Consumer 4
Adding to queue - Producer 5
Getting from queue Consumer 5

If wait/notify related code is commented though the access is synchronized nothing stops producer thread to keep producing numbers or consumer for keep on consuming numbers. Since I have used a list here and trying to get 0th element from the list it may even lead to ArrayIndexOutofBoundsException.

class Producer implements Runnable{
    List<Integer> sharedListObj;
    Producer(List<Integer> sharedListObj){
        this.sharedListObj = sharedListObj;
    }
    @Override
    public void run() {
        int i = 0;
        while(true){
            synchronized (sharedListObj) {
                // While condition as mandated to avoid spurious wakeup
                /*while(sharedListObj.size() >= 1){
                    try {
                        sharedListObj.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }*/
                // Putting value in the list
                System.out.println("Adding to queue - " + Thread.currentThread().getName() + " " + ++i);
                sharedListObj.add(i);
                //sharedListObj.notify();    
                // To get out of while(true) loop
                if(i > 4) break;
            }
        }
    }            
}

class Consumer implements Runnable{
    List<Integer> sharedListObj;
    Consumer(List<Integer> sharedListObj){
        this.sharedListObj = sharedListObj;
    }
    @Override
    public void run() {    
        while(true){
            synchronized (sharedListObj) {
                /*while(sharedListObj.size() < 1){
                    try {
                        sharedListObj.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }                    
                }*/
                // Getting value from the list
                System.out.println("Getting from queue " + Thread.currentThread().getName() + " " + sharedListObj.get(0));
                // To get out of while(true) loop
                if(sharedListObj.get(0) == 5) break;
                sharedListObj.remove(0);
                //sharedListObj.notify();        
                
            }
        }
    }
}


public class InterThreadDemo {
    public static void main(String[] args) {
        // This is the shared list shared between producer
        // and consumer
        List<Integer> sharedListObj = new ArrayList<Integer>();
        Thread t1 = new Thread(new Producer(sharedListObj), "Producer");
        Thread t2 = new Thread(new Consumer(sharedListObj), "Consumer");
        t1.start();
        t2.start();    
    }

}

Output

Adding to queue - Producer 1
Adding to queue - Producer 2
Adding to queue - Producer 3
Adding to queue - Producer 4Exception in thread "Consumer" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
Getting from queue Consumer 1
Getting from queue Consumer 2
Getting from queue Consumer 3
Getting from queue Consumer 4
Adding to queue - Producer 5

 at java.util.ArrayList.rangeCheck(ArrayList.java:653)
 at java.util.ArrayList.get(ArrayList.java:429)
 at org.netjs.examples.Consumer.run(InterThreadDemo.java:55)
 at java.lang.Thread.run(Thread.java:745)

It can be seen, how, in absence of proper inter-thread communication nothing stops producer thread from keep on producing numbers and when the consumer thread gets hold of the synchronized block code it keeps on consuming numbers.

That's all for this topic Inter-thread communication using wait, notify and notifyAll. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Synchronization in Java multithreading
  2. Deadlock in Java multi-threading
  3. Can we start the same thread twice in Java
  4. Race condition in Java multi-threading
  5. ThreadLocal class in Java
  6. Java Multi-threading interview questions

You may also like -

No comments:

Post a Comment