Java Parallel Stream

Previous
Next

In this post, we will see about Parallel Stream in java.

Java Parallel Stream introduction

Java 8 introduces the concept of parallel stream to do parallel processing. As we have more number of cpu cores nowadays due to cheap hardware costs, parallel processing can be used to perform operation faster.

Let’s understand with help of simple example

When you run above program, you will get below output

=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1

If you notice the output,main thread is doing all the work in case of sequential stream. It waits for current iteration to complete and then work on next iteration.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.Parallel streams create ForkJoinPool instance via static ForkJoinPool.commonPool() method.

Parallel Stream takes the benefits of all available CPU cores and processes the tasks in parallel. If number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.

Parallel Streams are cool, so should you use it always?

A big No!!
It is easy to convert sequential Stream to parallel Stream just by adding .parallel, does not mean you should always use it.
There are lots of factors you need to consider while using parallel streams otherwise you will suffer from negative impacts of parallel Streams.

Parallel Stream has much higher overhead than sequential Stream and it takes good amount of time to coordinate between threads.
You need to consider parallel Stream if and only if:

  • You have large dataset to process.
  • As you know that Java uses  ForkJoinPool to achieve parallelism, ForkJoinPool forks sources stream and submit for execution, so your source stream should be splittable.
    For example:
    ArrayList is very easy to split, as we can find a middle element by its index and split it but LinkedList is very hard to split and does not perform very well in most of the cases.
  • You are actually suffering from performance issues.
  • You need to make sure that all the shared resources between threads need to be synchronized properly otherwise it might produce unexpected results.

Simplest formula for measuring parallelism is “NQ” model as provided by Brian Goetz in his presentation.

NQ Model:

N x Q > 10000

where,
N = number of items in dataset
Q = amount of work per item

It means if you have a large number of datasets and less work per item(For example: Sum), parallelism might help you run program faster and vice versa is also true. So if you have less number of datasets and more work per item(doing some computational work), then also  parallelism might help you in achieving results faster.

Let’s see with the help of another example.

In this example, we are going to see how CPU behaves when you perform long computations in case of parallel Stream and sequential stream.We are doing some arbit calculations to make CPU busy.

When you run above program, you will get below output.

117612733
Time taken to complete:6 minutes

But we are not interested in output here, but how CPU behaved when above operation performed.

As you can see CPU is not fully utilized in case of Sequential Stream.

Let’s change at 16 line no. and make the stream parallel and run the program again.

You will get below output when you run Stream in parallel.

117612733
Time taken to complete:3 minutes

Let’s check CPU history when we ran program using parallel stream.

Parallel Stream

As you can see parallel stream used all 4 CPU cores to perform computation.

Custom Thread pool in Parallel Stream

The parallel stream by default uses ForkJoinPool.commonPool which has one less thread than number of processor. This means parallel stream uses all available processors because it uses the main thread as well.

In case, you are using multiple parallel streams, then they will share same ForkJoinPool.commonPool .This means you may not be able to use all the processors assigned to each parallel stream.

To solve this issue, you can create own thread pool while processing the stream.

This will create ForkJoinPool with target parallelism level. If you don’t pass parallelism, it will equal to the number of processors by default.

Now you can submit parallel stream to this custom ForkJoinPool.

Let’s understand with the help of example.

When you run the program, you will get below output:

================
Parallel stream 1
================
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 5
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[main,5,main] 3
Sum: 23
================
Parallel stream 2
================
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 111
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[main,5,main] 66
Sum: 522
================
Parallel stream with custom thread pool 1
================
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 5
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 0
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Sum: 23
================
Parallel stream with custom thread pool 2
================
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 111
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 0
Processing with Thread[ForkJoinPool-2-worker-2,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Sum: 522

As you can see, first two parallel streams are using ForkJoinPool.commonPool and next 2 are using custom thread pools i.e. ForkJoinPool-1 and ForkJoinPool-2

Things you should keep in mind while using Parallel Stream

Stateful lambda expressions

You should avoid using stateful lambda expressions in stream operations.A Stateful lambda expressions is one whose output depends on any state that might change during execution of stream operations.

Output:

40 34 21 37 20
40 34 37 20 21

forEachOrdered processes the elements in order imposed bt stream. .map(e -> {syncList.add(e); return e;}) is stateful lambda and order in which .map(e -> {syncList.add(e); return e;}) adds element to
syncList can vary, so you should not use stateful lambda operations while using parallel stream.

Interference

Lambda expression in stream operation should not modify source of stream.
Following code tries to add element to list of integer and throw concurrentModification exception.

Output:

34 21 40 20 37 Exception in thread “main” java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.arpit.java2blog.ListOfIntegersStatefulLambda.main(ListOfIntegersStatefulLambda.java:19)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Please note that all the intermediate operations are lazy, execution of streams begins when foreach is invoked. As argument of peek tries to modifies stream source during execution of stream, which causes Java to throw ConcurrentModificationException

Conclusion

You have learnt about parallel streams when to use parallel streams with examples. You should be careful while using parallel streams. Parallel streams are very powerful if used in the correct context.

That’s all about parallel stream in java.

Previous
Next

Add Comment