In this post, we will see about Parallel Stream in java.
Table of Contents
Java Parallel Stream introduction
Java 8 introduces the concept of parallel stream to do parallel processing. As we have more number of cpu cores nowadays due to cheap hardware costs, parallel processing can be used to perform operation faster.
Let’s understand with help of simple example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package org.arpit.java2blog.java8; import java.util.Arrays; import java.util.stream.IntStream; public class Java8ParallelStreamMain { public static void main(String[] args) { System.out.println("================================="); System.out.println("Using Sequential Stream"); System.out.println("================================="); int[] array= {1,2,3,4,5,6,7,8,9,10}; IntStream intArrStream=Arrays.stream(array); intArrStream.forEach(s-> { System.out.println(s+" "+Thread.currentThread().getName()); } ); System.out.println("================================="); System.out.println("Using Parallel Stream"); System.out.println("================================="); IntStream intParallelStream=Arrays.stream(array).parallel(); intParallelStream.forEach(s-> { System.out.println(s+" "+Thread.currentThread().getName()); } ); } } |
When you run above program, you will get below output
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1
If you notice the output,main thread is doing all the work in case of sequential stream. It waits for current iteration to complete and then work on next iteration.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.Parallel streams create ForkJoinPool
instance via static ForkJoinPool.commonPool()
method.
Parallel Stream takes the benefits of all available CPU cores
and processes the tasks in parallel. If number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.
Parallel Streams are cool, so should you use it always?
A big No!!
It is easy to convert sequential Stream to parallel Stream just by adding .parallel, does not mean you should always use it.
There are lots of factors you need to consider while using parallel streams otherwise you will suffer from negative impacts of parallel Streams.
Parallel Stream has much higher overhead than sequential Stream and it takes good amount of time to coordinate between threads.
You need to consider parallel Stream if and only if:
- You have large dataset to process.
- As you know that Java usesÂ
ForkJoinPool
to achieve parallelism, ForkJoinPool forks sources stream and submit for execution, so your source stream should be splittable.
For example:
ArrayList is very easy to split, as we can find a middle element by its index and split it but LinkedList is very hard to split and does not perform very well in most of the cases. - You are actually suffering from performance issues.
- You need to make sure that all the shared resources between threads need to be synchronized properly otherwise it might produce unexpected results.
Simplest formula for measuring parallelism is "NQ" model as provided by Brian Goetz in his presentation.
NQ Model:
where,
N = number of items in dataset
Q = amount of work per item
It means if you have a large number of datasets and less work per item(For example: Sum), parallelism might help you run program faster and vice versa is also true. So if you have less number of datasets and more work per item(doing some computational work), then also parallelism
might help you in achieving results faster.
Let’s see with the help of another example.
In this example, we are going to see how CPU behaves when you perform long computations in case of parallel Stream and sequential stream.We are doing some arbit calculations to make CPU busy.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package org.arpit.java2blog.java8; import java.util.ArrayList; import java.util.List; public class PerformanceComparisonMain { public static void main(String[] args) { long currentTime=System.currentTimeMillis(); List<Integer> data=new ArrayList<Integer>(); for (int i = 0; i < 100000; i++) { data.add(i); } long sum=data.stream() .map(i ->(int)Math.sqrt(i)) .map(number->performComputation(number)) .reduce(0,Integer::sum); System.out.println(sum); long endTime=System.currentTimeMillis(); System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes"); } public static int performComputation(int number) { int sum=0; for (int i = 1; i < 1000000; i++) { int div=(number/i); sum+=div; } return sum; } } |
When you run above program, you will get below output.
Time taken to complete:6 minutes
But we are not interested in output here, but how CPU behaved when above operation performed.
As you can see CPU is not fully utilized in case of Sequential Stream.
Let’s change at 16 line no. and make the stream parallel and run the program again.
1 2 3 4 5 6 7 |
long sum=data.stream() .parallel() .map(i ->(int)Math.sqrt(i)) .map(number->performComputation(number)) .reduce(0,Integer::sum); |
You will get below output when you run Stream in parallel.
Time taken to complete:3 minutes
Let’s check CPU history when we ran program using parallel stream.
As you can see parallel stream used all 4 CPU cores to perform computation.
Custom Thread pool in Parallel Stream
The parallel stream by default uses ForkJoinPool.commonPool
which has one less thread than number of processor. This means parallel stream uses all available processors because it uses the main thread as well.
In case, you are using multiple parallel streams, then they will share same ForkJoinPool.commonPool
.This means you may not be able to use all the processors assigned to each parallel stream.
To solve this issue, you can create own thread pool while processing the stream.
1 2 3 |
ForkJoinPool fjp = new ForkJoinPool(parallelism); |
This will create ForkJoinPool
with target parallelism
level. If you don’t pass parallelism, it will equal to the number of processors by default.
Now you can submit parallel stream to this custom ForkJoinPool.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
ForkJoinPool fjp1 = new ForkJoinPool(5); Callable<Integer> callable1 = () -> data.parallelStream() .map(i -> (int) Math.sqrt(i)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); try { sumFJ1 = fjp1.submit(callable1).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } |
Let’s understand with the help of example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
package org.arpit.java2blog; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; public class PerformanceComparisonMain { public static void main(String[] args) { List<Integer> data = new ArrayList<Integer>(); for (int i = 0; i < 10; i++) { data.add(i); } System.out.println("================"); System.out.println("Parallel stream 1"); System.out.println("================"); long sum1 =data.parallelStream() .map(i -> (int) Math.sqrt(i)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); System.out.println("Sum: "+sum1); System.out.println("================"); System.out.println("Parallel stream 2"); System.out.println("================"); long sum2 = data.parallelStream() .map(i -> ((int) Math.sqrt(i)*10)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); System.out.println("Sum: "+sum2); System.out.println("================"); System.out.println("Parallel stream with custom thread pool 1"); System.out.println("================"); ForkJoinPool fjp1 = new ForkJoinPool(5); long sumFJ1 = 0; Callable<Integer> callable1 = () -> data.parallelStream() .map(i -> (int) Math.sqrt(i)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); try { sumFJ1 = fjp1.submit(callable1).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("Sum: "+sumFJ1); System.out.println("================"); System.out.println("Parallel stream with custom thread pool 2"); System.out.println("================"); Callable<Integer> callable2 = () -> data.parallelStream() .map(i -> (int) Math.sqrt(i)*10) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); long sumFJ2 = 0; ForkJoinPool fjp2 = new ForkJoinPool(4); try { sumFJ2 = fjp2.submit(callable2).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("Sum: "+sumFJ2); } public static int performComputation(int number) { int sum = 0; for (int i = 1; i < 100000; i++) { int div = (number / i); sum += div; } return sum; } } |
When you run the program, you will get below output:
Parallel stream 1
================
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 5
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[main,5,main] 3
Sum: 23
================
Parallel stream 2
================
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 111
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[main,5,main] 66
Sum: 522
================
Parallel stream with custom thread pool 1
================
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 5
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 0
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Sum: 23
================
Parallel stream with custom thread pool 2
================
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 111
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 0
Processing with Thread[ForkJoinPool-2-worker-2,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Sum: 522
As you can see, first two parallel streams are using ForkJoinPool.commonPool
and next 2 are using custom thread pools i.e. ForkJoinPool-1
and ForkJoinPool-2
Things you should keep in mind while using Parallel Stream
Stateful lambda expressions
You should avoid using stateful lambda expressions in stream operations.A Stateful lambda expressions is one whose output depends on any state that might change during execution of stream operations.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package org.arpit.java2blog; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; public class ListOfIntegersStatefulLambda { public static void main(String[] args) { List<Integer> listOfIntegers = Arrays.asList(new Integer[] {40,34,21,37,20}); List<Integer> syncList = Collections.synchronizedList(new ArrayList<>()); listOfIntegers.parallelStream() // You shou! It uses a stateful lambda expression. .map(e -> { syncList.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); syncList.stream().forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); } } |
Output:
40 34 37 20 21
forEachOrdered processes the elements in order imposed bt stream. .map(e -> {syncList.add(e); return e;})
is stateful lambda and order in which .map(e -> {syncList.add(e); return e;})
adds element to
syncList
can vary, so you should not use stateful lambda operations while using parallel stream.
Interference
Lambda expression in stream operation should not modify source of stream.
Following code tries to add element to list of integer and throw concurrentModification exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
package org.arpit.java2blog; import java.util.ArrayList; import java.util.List; public class ListOfIntegersStatefulLambda { public static void main(String[] args) { List<Integer> listOfIntegers = new ArrayList<>(); Integer[] intArray =new Integer[] {40,34,21,37,20}; for(Integer in:intArray) { listOfIntegers.add(in); } listOfIntegers.parallelStream() .peek( i -> listOfIntegers.add(7)) .forEach(e -> System.out.print(e + " ")); System.out.println(""); } } |
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.arpit.java2blog.ListOfIntegersStatefulLambda.main(ListOfIntegersStatefulLambda.java:19)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Please note that all the intermediate operations are lazy, execution of streams begins when foreach
is invoked. As argument of peek
tries to modifies stream source during execution of stream, which causes Java to throw ConcurrentModificationException
Conclusion
You have learnt about parallel streams when to use parallel streams with examples. You should be careful while using parallel streams. Parallel streams are very powerful if used in the correct context.
That’s all about parallel stream in java.