Friday, 1 April 2016

ConcurrentLinkedQueue in Java

ConcurrentLinkedQueue implements Queue interface and it was added in Java 5 along with other concurrent utilities like CyclicBarrier, CountDownLatch, Semaphore, ConcurrentHashMap etc.

ConcurrentLinkedQueue is an unbounded thread-safe queue which stores its elements as linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

Usage of ConcurrentLinkedQueue

A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Note that it doesn't block operations as it is done in the implementations of BlockingQueue interface like ArrayBlockingQueue, LinkedBlockingQueue. So there are no put() or take() methods which will wait if required.

No nulls

Like most other concurrent collection implementations, this class does not permit the use of null elements.

Iterator

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throwConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

Example Code

Let's create a producer consumer using ConcurrentLinkedQueue. In this code there will be one producer thread putting element into the queue and three consumer threads retrieving elements from the queue. Note that producer thread will put only 3 elements.

You can run the program multiple times and observe that two consumer threads are not retrieving the same element from the queue.

Later ConcurrentLinkedQueue is replaced with PriorityQueue which is not synchronized. Running this code will result in consumer threads getting into a race condition and overstepping on each other. Note that sometimes you may get correct output also but in multiple runs you are bound to get incorrect results.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLQDemo {

    public static void main(String[] args) {
        Buffer buffer = new Buffer();
        ExecutorService executor = Executors.newFixedThreadPool(4);
        // Calling one producer
        executor.execute(new ProdTask(buffer));
        // Calling three consumers
        executor.execute(new ConTask(buffer));
        executor.execute(new ConTask(buffer));
        executor.execute(new ConTask(buffer));
        executor.shutdown();

    }

}

/**
 * 
 * 
 *
 */
class ProdTask implements Runnable{
    Buffer buffer;
    ProdTask(Buffer buffer){
        this.buffer = buffer;
    }
    @Override
    public void run() {
        // putting just three elements
        for(int i = 0; i < 3; i++){
            buffer.put(i);
        }
    }
}

/**
 * 
 * 
 *
 */
class ConTask implements Runnable{
    Buffer buffer;
    ConTask(Buffer buffer){
        this.buffer = buffer;
    }
    @Override
    public void run() {
        try {
            // delay to make sure producer starts first
            Thread.sleep(20);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        buffer.get();
    }    
}

//Shared class used by threads
class Buffer{
    int i;
    Queue<Integer> clQueue = new ConcurrentLinkedQueue<Integer>();
   
    //Retrieving from the queue 
    public void get(){
        System.out.println("Consumer recd - " + clQueue.poll());
    }
    // putting in the queue
    public void put(int i){
        this.i = i;
        clQueue.add(i);
        System.out.println("Putting - " + i);
    }
   
}

Output

Putting - 0
Putting - 1
Putting - 2
Consumer recd - 0
Consumer recd - 2
Consumer recd - 1

Now let's replace the ConcurrentLinkedQueue with PriorityQueue which is not synchronized.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLQDemo {

    public static void main(String[] args) {
        Buffer buffer = new Buffer();
        ExecutorService executor = Executors.newFixedThreadPool(4);
        // Calling one producer
        executor.execute(new ProdTask(buffer));
        // Calling three consumers
        executor.execute(new ConTask(buffer));
        executor.execute(new ConTask(buffer));
        executor.execute(new ConTask(buffer));
        executor.shutdown();

    }

}

/**
 * 
 * 
 *
 */
class ProdTask implements Runnable{
    Buffer buffer;
    ProdTask(Buffer buffer){
        this.buffer = buffer;
    }
    @Override
    public void run() {
        // putting just three elements
        for(int i = 0; i < 3; i++){
            buffer.put(i);
        }
    }
}

/**
 * 
 * 
 *
 */
class ConTask implements Runnable{
    Buffer buffer;
    ConTask(Buffer buffer){
        this.buffer = buffer;
    }
    @Override
    public void run() {
        try {
            // delay to make sure producer starts first
            Thread.sleep(20);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        buffer.get();
    }    
}

//Shared class used by threads
class Buffer{
    int i;
    //Queue<Integer> clQueue = new ConcurrentLinkedQueue<Integer>();
    Queue<Integer> clQueue = new PriorityQueue<Integer>();
   
    //Retrieving from the queue 
    public void get(){
        System.out.println("Consumer recd - " + clQueue.poll());
    }
    // putting in the queue
    public void put(int i){
        this.i = i;
        clQueue.add(i);
        System.out.println("Putting - " + i);
    }  
}

Output

Putting - 0
Putting - 1
Putting - 2
Consumer recd - 0
Consumer recd - 1
Consumer recd - 0

Here it can be seen Consumer threads are getting the same element out oof the queue.

That's all for this topic ConcurrentLinkedQueue in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. SynchronousQueue in Java
  2. BlockingDeque in Java Concurrency
  3. ConcurrentHashMap in Java
  4. ReentrantReadWriteLock in Java
  5. Semaphore in Java concurrency
  6. Java Concurrency interview questions

You may also like -

No comments:

Post a Comment