Java inter thread communication

The preceding examples (in the previous chapter) unconditionally blocked the other threads from asynchronous access to certain methods. This use of implicit monitors in Java objects is powerful, but you can achieve a more subtle level of control through the inter process communication.

As you know from the earlier discussion, multithreading replaces event loop programming by dividing your tasks into separate, logical units. Threads also provide a secondary benefit, they do away with polling.

Polling is generally implemented by a loop that is used to check some condition repeatedly. Once the condition becomes true, appropriate action is taken. This wastes the CPU time. For instance, consider the classic queuing trouble, where one thread is producing some data and other is consuming it. To make the problem more interesting, suppose that the producer has to wait as far as the consumer is finished before it generates more data. In a polling system, the consumer would waste lots CPU cycles while it waited for the producer to produce. Once the producer was finished, then it would start polling, and wasting more CPU cycles waiting for the consumer to finish, and so on. Clearly, this situation is undesirable.

Thus, to avoid polling, Java includes an elegant interprocess communication mechanism via the methods named wait(), notify(), and notifyAll(). These methods are implemented as final methods in Object, so all classes have them. All the three methods can be called only from inside a synchronized context. Although conceptually advanced from a computer science view, the rules for using these methods are actually quite simple:

These methods are declared inside Object, as shown below :

final void wait() throws InterruptedException
final void notify()
final void notifyAll()

Additional forms of wait() exist that allow you to specify a period of time to wait.

Before working through an example that illustrates the interthread communication, an important point needs to be made. Even though wait() normally waits until notify() or notifyAll() is called, there is a possibility that is very rare cases the waiting thread could be awakened due to a spurious wakeup. In this case, a waiting thread resumes without the method notify() or notifyAll() having been called. In essence, the thread resumes for no apparent reason. As of this remote possibility, Oracle recommends that calls to wait() should take place within a loop which checks the condition on which the thread is waiting. You will see this technique in the upcoming example program.

Java inter thread communication Example

Let's now work through an example that uses wait() and notify() method. To start, consider the below sample program that incorrectly implements a simple form of the producer/consumer problem. It consists of the following four classes:

Here is Java example source code (incorrect implementation):

/* Java Program Example - Java inter thread communication
 * An incorrect implementation of producer and consumer */
 
 class Q
 {
     int num;
     
     synchronized int get()
     {
         System.out.println("Got : " + num);
         return num;
     }
     
     synchronized void put(int num)
     {
         this.num = num;
         System.out.println("Put : " + num);
     }
 }
 
 class Producer implements Runnable
 {
     Q que;
     
     Producer(Q que)
     {
         this.que = que;
         new Thread(this, "Producer").start();
     }
     
     public void run()
    {
        int n = 0;
        
        while(true)
        {
            que.put(n++);
        }
    }
}

class Consumer implements Runnable
{
    Q que;
    
    Consumer(Q que)
    {
        this.que = que;
        new Thread(this, "Consumer").start();
    }
    
    public void run()
    {
        while(true)
        {
            que.get();
        }
    }
}

class PC
{
    public static void main(String args[])
    {
        
        Q que = new Q();
        new Producer(que);
        new Consumer(que);
        
        System.out.println("Press Control-C to stop...");
        
    }
}

Although, put() and get() methods on Q are synchronized, nothing stops the producer from overrunning the consumer, nor will anything stop the consumer to consume the same queue value twice. Therefore, you get the erroneous output shown below (the exact output will vary with the processor speed and the task load):

Put : 1
Got : 1
Got : 1
Got : 1
Got : 1
Got : 1
Put : 2
Put : 3
Put : 4
Put : 5
Put : 6
Put : 7
Got : 7

As you can see, after the producer put 1, the consumer started and acquired the same 1 five times in a row. The, the producer resumed and produced 2 through 7 without allowing the consumer have a chance to consume them.

The proper way to write this program in Java is to use the methods named wait() and notify() to signal in both directions, as shown below:

/* Java Program Example - Java inter thread communication
 * A correct implementation of producer and consumer */
 
 class Q
 {
     int num;
     boolean valueSet = false;
     
     synchronized int get()
     {
         while(!valueSet)
            try
            {
                wait();
            }
            catch(InterruptedException e)
            {
                System.out.println("InterruptedException caught..!!");
            }
         System.out.println("Got : " + num);
         valueSet = false;
         notify();
         return num;
     }
     
     synchronized void put(int num)
     {
         while(valueSet)
            try
            {
                wait();
            }
            catch(InterruptedException e)
            {
                System.out.println("InterruptedException caught..!!");
            }
            this.num = num;
            valueSet = true;
            System.out.println("Put : " + num);
            notify();
     }
 }
 
 class Producer implements Runnable
 {
     Q que;
     
     Producer(Q que)
     {
         this.que = que;
         new Thread(this, "Producer").start();
     }
     
     public void run()
    {
        int n = 0;
        
        while(true)
        {
            que.put(n++);
        }
    }
}

class Consumer implements Runnable
{
    Q que;
    
    Consumer(Q que)
    {
        this.que = que;
        new Thread(this, "Consumer").start();
    }
    
    public void run()
    {
        while(true)
        {
            que.get();
        }
    }
}

class PCFixed
{
    public static void main(String args[])
    {
        
        Q que = new Q();
        new Producer(que);
        new Consumer(que);
        
        System.out.println("Press Control-C to stop...");
        
    }
}

Inside get(), wait() is called. This causes its execution to suspend until Producer notifies you that some data is ready. When this happens, execution inside the method named get() resumes. After the the data has been got, get() calls notify(). This tells Producer that it is fine to put more data in the queue. Inside the method named put(), wait() suspends execution until Consumer has removed the item from the queue. When the execution resumes, the next item of data is put in the queue, and notify() is called. This tells Consumer that it should now remove it.

Below is some output from this program, which shows the clean synchronous behaviour :

Put : 1
Got : 1
Put : 2
Got : 2
Put : 3
Got : 3
Put : 4
Got : 4
Put : 5
Got : 5

Java Online Test


« Previous Tutorial Next Tutorial »