Why do we need Executor framework?
Core interface of Executor framework is Executor. It has a method called “execute”.
1 2 3 4 5 |
public interface Executor { void execute(Runnable command); } |
Example:
1 2 3 |
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); |
Let’s create a basic example of ThreadPoolExecutor and we will use newFixedThreadPool to create a instance of ThreadPoolExecutor.
Let’s create a Task. Here Task will be to read different files and process them.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package org.arpit.java2blog.bean; public class FetchDataFromFile implements Runnable{ private final String fileName; public FetchDataFromFile(String fileName) { super(); this.fileName = fileName; } @Override public void run() { try { System.out.println("Fetching data from "+fileName+" by "+Thread.currentThread().getName()); Thread.sleep(5000); // Reading file System.out.println("Read file successfully: "+fileName+" by "+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } public String getFileName() { return fileName; } } |
Let’s create ThreadPoolExecutor which will consume above task and process it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package org.arpit.java2blog; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class ThreadPoolExecutorMain { public static void main(String args[]) { // Getting instance of ThreadPoolExecutor using Executors.newFixedThreadPool factory method ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { FetchDataFromFile fdff = new FetchDataFromFile("File " + i); System.out.println("A new file has been added to read : " + fdff.getFileName()); // Submitting task to executor threadPoolExecutor.execute(fdff); } threadPoolExecutor.shutdown(); } } |
When you run above program, you will get below output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
A new file has been added to read : File 0 A new file has been added to read : File 1 A new file has been added to read : File 2 Fetching data from File 0 by pool-1-thread-1 Fetching data from File 1 by pool-1-thread-2 A new file has been added to read : File 3 Fetching data from File 2 by pool-1-thread-3 A new file has been added to read : File 4 Fetching data from File 3 by pool-1-thread-4 A new file has been added to read : File 5 Fetching data from File 4 by pool-1-thread-5 A new file has been added to read : File 6 A new file has been added to read : File 7 A new file has been added to read : File 8 A new file has been added to read : File 9 Read file successfully: File 1 by pool-1-thread-2 Read file successfully: File 3 by pool-1-thread-4 Fetching data from File 5 by pool-1-thread-4 Read file successfully: File 4 by pool-1-thread-5 Read file successfully: File 2 by pool-1-thread-3 Read file successfully: File 0 by pool-1-thread-1 Fetching data from File 8 by pool-1-thread-3 Fetching data from File 7 by pool-1-thread-5 Fetching data from File 6 by pool-1-thread-2 Fetching data from File 9 by pool-1-thread-1 Read file successfully: File 5 by pool-1-thread-4 Read file successfully: File 7 by pool-1-thread-5 Read file successfully: File 6 by pool-1-thread-2 Read file successfully: File 8 by pool-1-thread-3 Read file successfully: File 9 by pool-1-thread-1 |
We have used new newFixedThreadPool, so when we have submitted 10 task, 5 new threads will be created and will execute 5 tasks. Other 5 tasks will wait in wait queue. As soon as any task will be completed by thread, another task will be picked by this thread and will execute it.
Using constructor of ThreadPoolExecutor:
If you want to customize creation of ThreadPoolExecutor, you can use its constructors too.
1 2 3 4 5 6 7 8 9 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue , ThreadFactory threadFactory, RejectedExecutionHandler handler) ; |
corePoolSize: corePoolSize is the number of threads to keep in the pool, even if they are idle
MaximumPoolSize: the maximum number of threads to allow in the pool
keepAliveTime: When you have more threads already available than corePoolSize, then keepAliveTime is time upto which that thread will wait for task before terminating.
unit: time unit is for keepAliveTime
workQueue: workQueue is the BlockingQueue which holds the tasks before execution.
threadFactory: Factory which is used to create a new Thread.
handler : RejectedExecutionHandler which is used in case execution is block or queue is full. Lets create a RejectedExecutionHandler for handling rejected task.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package org.arpit.java2blog.bean; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectTaskHandler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { FetchDataFromFile ffdf=(FetchDataFromFile) r; System.out.println("Sorry!! We won't be able to read :"+ffdf.getFileName()); } } |
Lets change ThreadPoolExecutorMain.java to below code to make use of ThreadPoolExecutor constructor.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package org.arpit.java2blog.bean; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorMain { public static void main(String args[]) { // Wait queue is used to store waiting task BlockingQueue queue=new LinkedBlockingQueue(4); // Thread factory to create new threads ThreadFactory threadFactory=Executors.defaultThreadFactory(); // Rejection handler in case task get rejected RejectTaskHandler rth=new RejectTaskHandler(); // ThreadPoolExecutor constructor to create its instance ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10L, TimeUnit.MILLISECONDS, queue, threadFactory,rth ); for (int i = 1; i <= 10; i++) { FetchDataFromFile fdff = new FetchDataFromFile("File " + i); System.out.println("A new file has been added to read : " + fdff.getFileName()); // Submitting task to executor threadPoolExecutor.execute(fdff); } threadPoolExecutor.shutdown(); } } |
When you run above program, you will get below output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
A new file has been added to read : File 1 A new file has been added to read : File 2 A new file has been added to read : File 3 A new file has been added to read : File 4 Fetching data from File 1 by pool-1-thread-1 A new file has been added to read : File 5 A new file has been added to read : File 6 A new file has been added to read : File 7 Sorry!! We won't be able to read :File 7 A new file has been added to read : File 8 Sorry!! We won't be able to read :File 8 A new file has been added to read : File 9 Sorry!! We won't be able to read :File 9 A new file has been added to read : File 10 Sorry!! We won't be able to read :File 10 Fetching data from File 6 by pool-1-thread-2 Read file successfully: File 1 by pool-1-thread-1 Read file successfully: File 6 by pool-1-thread-2 Fetching data from File 2 by pool-1-thread-1 Fetching data from File 3 by pool-1-thread-2 Read file successfully: File 3 by pool-1-thread-2 Read file successfully: File 2 by pool-1-thread-1 Fetching data from File 4 by pool-1-thread-2 Fetching data from File 5 by pool-1-thread-1 |