In this post, we will see how to create your own custom BlockingQueue.
This is one of the most asked java interview questions. You need to implement your own BlockingQueue. This question helps interviewer to get your understanding of multithreading concepts.
Here is simple implementation of BlockingQueue.
- We will use array to store elements in BlockingQueue internally. Size of this array defines maximum number of elements that can reside in BlockingQueue at a time.
- We will use
lock
andconditions
objects to create custom BlockingQueue. - While putting the element in the queue, if the queue is full, then the producer will
wait
for queue to empty. - While consuming element from the queue, if the queue is empty then the
consumer
will wait for thequeue
to get filled.
Create a class named CustomBlockingQueue.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
package org.arpit.java2blog; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class CustomBlockingQueue { final Lock lock = new ReentrantLock(); // Conditions final Condition produceCond = lock.newCondition(); final Condition consumeCond = lock.newCondition(); // Array to store element for CustomBlockingQueue final Object[] array = new Object[6]; int putIndex, takeIndex; int count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == array.length){ // Queue is full, producers need to wait produceCond.await(); } array[putIndex] = x; System.out.println("Producing - " + x); putIndex++; if (putIndex == array.length){ putIndex = 0; } // Increment the count for the array ++count; consumeCond.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0){ // Queue is empty, consumers need to wait consumeCond.await(); } Object x = array[takeIndex]; System.out.println("Consuming - " + x); takeIndex++; if (takeIndex == array.length){ takeIndex = 0; } // reduce the count for the array --count; // send signal producer produceCond.signal(); return x; } finally { lock.unlock(); } } } |
Create another main class which will use above CustomBLockingQueue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
package org.arpit.java2blog; public class CustomBlockingQueueMain { public static void main(String[] args) { CustomBlockingQueue customBlockingQueue = new CustomBlockingQueue(); // Creating producer and consumer threads Thread producer = new Thread(new Producer(customBlockingQueue)); Thread consumer = new Thread(new Consumer(customBlockingQueue)); producer.start(); consumer.start(); } } class Producer implements Runnable { private CustomBlockingQueue customBlockingQueue; public Producer(CustomBlockingQueue customBlockingQueue){ this.customBlockingQueue = customBlockingQueue; } @Override public void run() { for (int i = 1; i <= 10; i++) { try { customBlockingQueue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private CustomBlockingQueue customBlockingQueue; public Consumer(CustomBlockingQueue customBlockingQueue){ this.customBlockingQueue = customBlockingQueue; } @Override public void run() { for (int i = 1; i <= 10; i++) { try { customBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
We have created two Runnable classes, one for producer and another for consumer and created two threads using these runnables.
Producing – 2
Producing – 3
Producing – 4
Producing – 5
Producing – 6
Consuming – 1
Consuming – 2
Consuming – 3
Consuming – 4
Consuming – 5
Consuming – 6
Producing – 7
Producing – 8
Producing – 9
Producing – 10
Consuming – 7
Consuming – 8
Consuming – 9
Consuming – 10
Output may vary for you but there can be only 6 elements at a time in the CustomBlockingQueue.
That’s all about Custom BlockingQueue implementation in java