In this article, we will understand the Java concurrent queue, BlockingQueue. We will then go deep into it’s one of the implementation, ArrayBlockingQueue.
Table of Contents
What is BlockingQueue
BlockingQueue interface was introduced in Java 5 under concurrent APIs and it represents a thread-safe queue in which elements can be added to and removed from. We can have multiple threads inserting and removing elements from BlockingQueue
concurrently.
The reason it is called blocking
because it has the ability to block a thread from inserting an element when this queue is full and also it blocks a thread from removing an element when this queue is empty. Of course, there are different methods which decide whether to block the thread or return an exception.
ArrayBlockingQueue
The ArrayBlockingQueue implements the BlockingQueue java interface. It is a concurrent and bounded blocking queue implementation and it uses arrays to store the elements. The class provides blocking and nonblocking functions for insertion and removal of elements from the queue. By blocking it implies the functions like take() and put() will block the consumer or producer thread indefinitely unless an element is removed or inserted.
Features
- It is a thread-safe implementation of the blocking queue.
- It is a bounded queue whose size is given at the creation of an object and can’t be changed once instantiated. The queue is implemented using an array internally.
- The elements on the ArrayBlockingQueue can be consumed in the insertion order or FIFO.
- A null object is not allowed and it will throw an exception in case a being put on the blocking queue is null.
- It has both blocking and nonblocking functions to place or remove elements from the queue.
- It allows fairness policy for the Producer or Consumer threads. The fairness policy is explained in detail below
Further reading
Custom BlockingQueue implementation in java
BlockingQueue in java
Java ThreadPoolExecutor
Constructors
The ArrayBlockingQueue has three constructors. The fairness
flag set as true implies that if multiple Producer or Consumer threads are waiting on the queue either for adding or removing an element from the queue then it will ensure a FIFO order for the waiting threads else if it’s false then it doesn’t guarantee the order of the thread.
The ArrayBlockingQueue
in java internally uses a Reentrant lock
object instance. It also uses the lock to create two conditions notEmpty and notFull.
The
notEmpty condition
will allow consumers to remove elements from the queue in a threadsafe manner and will block the consumer threads in case the queue gets full.The
notFull condition
will in a similar way allow producer threads to add elements in the queue in a thread-safe manner until it’s full after which it will block all producer threads.
In the third Constructor, if we pass a collection, the initial ArrayBlockingQueue will have the elements of that collection in the traversal order.
1 2 3 4 5 6 7 |
public ArrayBlockingQueue(int capacity, boolean fair) public ArrayBlockingQueue(int capacity) ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) |
Methods
ArrayBlockingQueue
has many supported methods, we are going to list all of them and in addition, we have added the source code snippets of the actual implementation of certain functions for better understanding. The source snippets of the functions added to help in understanding how the lock and condition objects initialized in the constructor are utilized.
-
offer(E)
: This function as shown below, will be adding an element into the queue at the tail, and if successful return true else if the queue capacity is reached it will return false. It’s a thread-safe method and is nonblocking i.e. if the queue is full it won’t block the producer thread but return false.1234567891011121314151617public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}} -
offer(E e, long timeout, TimeUnit unit)
: The behavior of this function is similar to the offer(E) except in case of blocking queue being full it doesn’t quickly return false but waits till the timeout value to see if the space becomes available in the blocking queue to insert the element before it returns false.123456789101112131415161718192021public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}} -
put(E e)
: This function will insert an element into the blocking queue at the tail end and will wait indefinitely unless interrupted for insertion if the blocking queue is full.1234567891011121314public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}} -
add(E)
: This function internally uses offer(E) and behaves in the same manner except when the blocking queue is full it throws an IllegalStateException. -
poll()
: This function removes and returns the element at the top of the blocking queue and null if the queue is empty. This is a non-blocking function.1234567891011public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}} -
poll(long timeout, TimeUnit unit)
: This function behaves just like the poll function except it will wait for the timeout parameter value if the blocking queue is empty before it attempts to fetch the element at the top of the blocking queue.1234567891011121314151617public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}} -
take()
: This function will return the element at the top of the blocking queue if the queue is not empty. If the blocking queue is empty then the thread calling this function will be waiting till an element is inserted into the blocking queue.12345678910111213public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}} -
peek()
: This function will return the element at the top of the blocking queue without removing the element and null if the blocking queue is empty. -
size()
: This function returns the capacity of the blocking queue -
remainingCapacity()
: This function returns a difference between the max elements the blocking queue can hold minus the elements that are currently in the blocking queue. -
remove(Object o)
: This function removes a single instance of an element from the blocking queue if its equal to the object passed to this function. If it finds a matching element then after removal returns true else false. -
contains(Object o)
: This function returns true if an object exists on the blocking queue matching the object passed as the input parameter else false. -
toArray()
: This function returns an Object[], which is an inorder copy of the internal array which backs the blocking queue. System.arraycopy() is used to copy the array to ensure the external modifications on the returned array don’t impact the blocking queue. -
clear()
: This function will remove all the elements in the blocking queue in an atomic manner. This will also signal any producer threads waiting in the blocking queue after emptying the queue. -
drainTo()
: This function will drain all the elements in the blocking queue in an atomic manner. In case your collection parameter instance and the instance on which this function called is the same then an IllegalArgumentException will be thrown. Any waiting producer threads will be signaled that the queue is empty and ready to accept new elements.
Usage Scenarios
An ArrayBlockingQueue instance can be used for resolving producer and consumer type problems when a full-fledged messaging infrastructure is not required. It can be used as a resource pool to throttle the consumers if the resources are limited.
Implementation Code
In the below unit test we create an ArrayBlockingQueue instance and start the producer and if the producer thread waits for a space created by some consumer thread. Similarly, we are also testing if the consumer thread waits for an element to be added to the queue once the queue is empty.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
package main.com.kv.mt.concurrent.blockingqueue; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * The below code tests where the ArrayBlockingQueue makes the producer threads wait when the queue is full * and similarly it makes the consumer threads full if the queue is empty */ public class Java2BlogArrayBlockingQueue { public static void main(String args[]){ final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(5); final Random random = new Random(); //producer thread keeps running until its interrupted Runnable producer = () -> { boolean isInterrupted = false; while(!isInterrupted) { try { System.out.println(Thread.currentThread().getName() + " adding to queue"); blockingQueue.put(random.nextInt()); System.out.println(Thread.currentThread().getName() + " finished adding to queue"); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " interrupted"); isInterrupted = true; } }; }; //consumer thread keeps running until its interrupted Runnable consumer = () -> { boolean isInterrupted = false; while(!isInterrupted) { try { System.out.println(Thread.currentThread().getName() + " retrieving from queue"); System.out.println(Thread.currentThread().getName() + " retrieved " + blockingQueue.take() + " from queue"); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " interrupted"); isInterrupted = true; } } }; Thread producerThread = new Thread(producer); producerThread.setName("MyProducer"); Thread consumerThread = new Thread(consumer); consumerThread.setName("MyConsumer"); producerThread.start(); //this code is to wait for the main thread to wait till the producer completely fills the blocking queue while(blockingQueue.remainingCapacity()!=0){ try{ Thread.sleep(5000); }catch (InterruptedException ie){ System.out.println(" interrupted main thread"); } } //This log checks the MyProducer thread state as it should be now in waiting state as the blocking queue is full System.out.println("Queue is full and the MyProducer thread state : "+producerThread.getState()); assert (Thread.State.WAITING==producerThread.getState()); assert(producerThread.isAlive()); // The producer thread is stopped to ensure the blocking queue becomes empty once all integers are consumed producerThread.interrupt(); // now start the consumer threads consumerThread.start(); //wait for the consumer to drain the blocking queue while(((ArrayBlockingQueue) blockingQueue).remainingCapacity()!=5){ try{ Thread.sleep(5000); }catch (InterruptedException ie){ System.out.println(" interrupted main thread"); } } //check the status of the consumer thread once the blocking queue is empty. it should we in waiting state System.out.println("Queue is empty and the MyConsumer thread state : "+consumerThread.getState()); assert(Thread.State.WAITING==consumerThread.getState()); assert(consumerThread.isAlive()); //stop the consumer consumerThread.interrupt(); } } |
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
Queue is full and the MyProducer thread state : WAITING
MyProducer interrupted
MyConsumer retrieving from queue
MyConsumer retrieved -65648598 from queue
MyConsumer retrieving from queue
MyConsumer retrieved -1141421021 from queue
MyConsumer retrieving from queue
MyConsumer retrieved 1476346866 from queue
MyConsumer retrieving from queue
MyConsumer retrieved 1937023750 from queue
MyConsumer retrieving from queue
MyConsumer retrieved -1723127356 from queue
MyConsumer retrieving from queue
Queue is empty and the MyConsumer thread state : WAITING
MyConsumer interrupted
Summary
- We have understood what a concurrent
BlockingQueue
is and why it is important in a multi-threaded environment. - We have also seen the implementation of BlockingQueue, i.e.
ArrayBlockingQueue
. - We have gone through the constructors and methods of the
ArrayBlockingQueue
. - We have implemented our own
ArrayBlockingQueue
and tested it with producer and consumer.