ArrayBlockingQueue in java

ArrayBlockingQueue in java

In this article, we will understand the Java concurrent queue, BlockingQueue. We will then go deep into it’s one of the implementation, ArrayBlockingQueue.

What is BlockingQueue

BlockingQueue interface was introduced in Java 5 under concurrent APIs and it represents a thread-safe queue in which elements can be added to and removed from. We can have multiple threads inserting and removing elements from BlockingQueue concurrently.

The reason it is called blocking because it has the ability to block a thread from inserting an element when this queue is full and also it blocks a thread from removing an element when this queue is empty. Of course, there are different methods which decide whether to block the thread or return an exception.

ArrayBlockingQueue

The ArrayBlockingQueue implements the BlockingQueue java interface. It is a concurrent and bounded blocking queue implementation and it uses arrays to store the elements. The class provides blocking and nonblocking functions for insertion and removal of elements from the queue. By blocking it implies the functions like take() and put() will block the consumer or producer thread indefinitely unless an element is removed or inserted.

arrayblockingqueue

Features

  • It is a thread-safe implementation of the blocking queue.
  • It is a bounded queue whose size is given at the creation of an object and can’t be changed once instantiated. The queue is implemented using an array internally.
  • The elements on the ArrayBlockingQueue can be consumed in the insertion order or FIFO.
  • A null object is not allowed and it will throw an exception in case a being put on the blocking queue is null.
  • It has both blocking and nonblocking functions to place or remove elements from the queue.
  • It allows fairness policy for the Producer or Consumer threads. The fairness policy is explained in detail below

Further reading

Custom BlockingQueue in java

Custom BlockingQueue implementation in java

In this post, we will see how to create your own custom BlockingQueue. This is one of the most asked ...
BlockingQueue in java

BlockingQueue in java

BlockingQueue is introduced in java with concurrent package with ConcurrentHashMap. It is thread safe queue to put and take elements ...
Java ThreadPoolExecutor

Java ThreadPoolExecutor

Java 5 has introduced new concurrent API called Executor frameworks to make programmer life easy. It simplifies design and development ...

Constructors

The ArrayBlockingQueue has three constructors. The fairness flag set as true implies that if multiple Producer or Consumer threads are waiting on the queue either for adding or removing an element from the queue then it will ensure a FIFO order for the waiting threads else if it’s false then it doesn’t guarantee the order of the thread.

The ArrayBlockingQueue in java internally uses a Reentrant lock object instance. It also uses the lock to create two conditions notEmpty and notFull.

The notEmpty condition will allow consumers to remove elements from the queue in a threadsafe manner and will block the consumer threads in case the queue gets full.

The notFull condition will in a similar way allow producer threads to add elements in the queue in a thread-safe manner until it’s full after which it will block all producer threads.

In the third Constructor, if we pass a collection, the initial ArrayBlockingQueue will have the elements of that collection in the traversal order.

Methods

ArrayBlockingQueue has many supported methods, we are going to list all of them and in addition, we have added the source code snippets of the actual implementation of certain functions for better understanding. The source snippets of the functions added to help in understanding how the lock and condition objects initialized in the constructor are utilized.

  1. offer(E): This function as shown below, will be adding an element into the queue at the tail, and if successful return true else if the queue capacity is reached it will return false. It’s a thread-safe method and is nonblocking i.e. if the queue is full it won’t block the producer thread but return false.

  2. offer(E e, long timeout, TimeUnit unit): The behavior of this function is similar to the offer(E) except in case of blocking queue being full it doesn’t quickly return false but waits till the timeout value to see if the space becomes available in the blocking queue to insert the element before it returns false.

  3. put(E e): This function will insert an element into the blocking queue at the tail end and will wait indefinitely unless interrupted for insertion if the blocking queue is full.

    pre>
  4. add(E): This function internally uses offer(E) and behaves in the same manner except when the blocking queue is full it throws an IllegalStateException.

  5. poll(): This function removes and returns the element at the top of the blocking queue and null if the queue is empty. This is a non-blocking function.

  6. poll(long timeout, TimeUnit unit): This function behaves just like the poll function except it will wait for the timeout parameter value if the blocking queue is empty before it attempts to fetch the element at the top of the blocking queue.

  7. take(): This function will return the element at the top of the blocking queue if the queue is not empty. If the blocking queue is empty then the thread calling this function will be waiting till an element is inserted into the blocking queue.

  8. peek(): This function will return the element at the top of the blocking queue without removing the element and null if the blocking queue is empty.

  9. size(): This function returns the capacity of the blocking queue

  10. remainingCapacity(): This function returns a difference between the max elements the blocking queue can hold minus the elements that are currently in the blocking queue.

  11. remove(Object o): This function removes a single instance of an element from the blocking queue if its equal to the object passed to this function. If it finds a matching element then after removal returns true else false.

  12. contains(Object o): This function returns true if an object exists on the blocking queue matching the object passed as the input parameter else false.

  13. toArray(): This function returns an Object[], which is an inorder copy of the internal array which backs the blocking queue. System.arraycopy() is used to copy the array to ensure the external modifications on the returned array don’t impact the blocking queue.

  14. clear(): This function will remove all the elements in the blocking queue in an atomic manner. This will also signal any producer threads waiting in the blocking queue after emptying the queue.

  15. drainTo(): This function will drain all the elements in the blocking queue in an atomic manner. In case your collection parameter instance and the instance on which this function called is the same then an IllegalArgumentException will be thrown. Any waiting producer threads will be signaled that the queue is empty and ready to accept new elements.

Usage Scenarios

An ArrayBlockingQueue instance can be used for resolving producer and consumer type problems when a full-fledged messaging infrastructure is not required. It can be used as a resource pool to throttle the consumers if the resources are limited.

Implementation Code

In the below unit test we create an ArrayBlockingQueue instance and start the producer and if the producer thread waits for a space created by some consumer thread. Similarly, we are also testing if the consumer thread waits for an element to be added to the queue once the queue is empty.

MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
Queue is full and the MyProducer thread state : WAITING
MyProducer interrupted
MyConsumer retrieving from queue
MyConsumer retrieved -65648598 from queue
MyConsumer retrieving from queue
MyConsumer retrieved -1141421021 from queue
MyConsumer retrieving from queue
MyConsumer retrieved 1476346866 from queue
MyConsumer retrieving from queue
MyConsumer retrieved 1937023750 from queue
MyConsumer retrieving from queue
MyConsumer retrieved -1723127356 from queue
MyConsumer retrieving from queue
Queue is empty and the MyConsumer thread state : WAITING
MyConsumer interrupted

Summary

  1. We have understood what a concurrent BlockingQueue is and why it is important in a multi-threaded environment.
  2. We have also seen the implementation of BlockingQueue, i.e. ArrayBlockingQueue.
  3. We have gone through the constructors and methods of the ArrayBlockingQueue.
  4. We have implemented our own ArrayBlockingQueue and tested it with producer and consumer.

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *