Java Parallel Stream

Java 8 Parallel Stream

In this post, we will see about Parallel Stream in java.

Java Parallel Stream introduction

Java 8 introduces the concept of parallel stream to do parallel processing. As we have more number of cpu cores nowadays due to cheap hardware costs, parallel processing can be used to perform operation faster.

Let’s understand with help of simple example

When you run above program, you will get below output

=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1

If you notice the output,main thread is doing all the work in case of sequential stream. It waits for current iteration to complete and then work on next iteration.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.Parallel streams create ForkJoinPool instance via static ForkJoinPool.commonPool() method.

Parallel Stream takes the benefits of all available CPU cores and processes the tasks in parallel. If number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.

Parallel Streams are cool, so should you use it always?

A big No!!
It is easy to convert sequential Stream to parallel Stream just by adding .parallel, does not mean you should always use it.
There are lots of factors you need to consider while using parallel streams otherwise you will suffer from negative impacts of parallel Streams.

Parallel Stream has much higher overhead than sequential Stream and it takes good amount of time to coordinate between threads.
You need to consider parallel Stream if and only if:

  • You have large dataset to process.
  • As you know that Java uses ForkJoinPool to achieve parallelism, ForkJoinPool forks sources stream and submit for execution, so your source stream should be splittable.
    For example:
    ArrayList is very easy to split, as we can find a middle element by its index and split it but LinkedList is very hard to split and does not perform very well in most of the cases.
  • You are actually suffering from performance issues.
  • You need to make sure that all the shared resources between threads need to be synchronized properly otherwise it might produce unexpected results.

Simplest formula for measuring parallelism is "NQ" model as provided by Brian Goetz in his presentation.

NQ Model:

N x Q > 10000

where,
N = number of items in dataset
Q = amount of work per item

It means if you have a large number of datasets and less work per item(For example: Sum), parallelism might help you run program faster and vice versa is also true. So if you have less number of datasets and more work per item(doing some computational work), then also parallelism might help you in achieving results faster.

Let’s see with the help of another example.

In this example, we are going to see how CPU behaves when you perform long computations in case of parallel Stream and sequential stream.We are doing some arbit calculations to make CPU busy.

When you run above program, you will get below output.

117612733
Time taken to complete:6 minutes

But we are not interested in output here, but how CPU behaved when above operation performed.

As you can see CPU is not fully utilized in case of Sequential Stream.

Let’s change at 16 line no. and make the stream parallel and run the program again.

You will get below output when you run Stream in parallel.

117612733
Time taken to complete:3 minutes

Let’s check CPU history when we ran program using parallel stream.

Parallel Stream

As you can see parallel stream used all 4 CPU cores to perform computation.

Custom Thread pool in Parallel Stream

The parallel stream by default uses ForkJoinPool.commonPool which has one less thread than number of processor. This means parallel stream uses all available processors because it uses the main thread as well.

In case, you are using multiple parallel streams, then they will share same ForkJoinPool.commonPool .This means you may not be able to use all the processors assigned to each parallel stream.

To solve this issue, you can create own thread pool while processing the stream.

This will create ForkJoinPool with target parallelism level. If you don’t pass parallelism, it will equal to the number of processors by default.

Now you can submit parallel stream to this custom ForkJoinPool.

Let’s understand with the help of example.

When you run the program, you will get below output:

================
Parallel stream 1
================
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 5
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[main,5,main] 3
Sum: 23
================
Parallel stream 2
================
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 111
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[main,5,main] 66
Sum: 522
================
Parallel stream with custom thread pool 1
================
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 5
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 0
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Sum: 23
================
Parallel stream with custom thread pool 2
================
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 111
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 0
Processing with Thread[ForkJoinPool-2-worker-2,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Sum: 522

As you can see, first two parallel streams are using ForkJoinPool.commonPool and next 2 are using custom thread pools i.e. ForkJoinPool-1 and ForkJoinPool-2

Things you should keep in mind while using Parallel Stream

Stateful lambda expressions

You should avoid using stateful lambda expressions in stream operations.A Stateful lambda expressions is one whose output depends on any state that might change during execution of stream operations.

Output:

40 34 21 37 20
40 34 37 20 21

forEachOrdered processes the elements in order imposed bt stream. .map(e -> {syncList.add(e); return e;}) is stateful lambda and order in which .map(e -> {syncList.add(e); return e;}) adds element to
syncList can vary, so you should not use stateful lambda operations while using parallel stream.

Interference

Lambda expression in stream operation should not modify source of stream.
Following code tries to add element to list of integer and throw concurrentModification exception.

Output:

34 21 40 20 37 Exception in thread “main” java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.arpit.java2blog.ListOfIntegersStatefulLambda.main(ListOfIntegersStatefulLambda.java:19)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Please note that all the intermediate operations are lazy, execution of streams begins when foreach is invoked. As argument of peek tries to modifies stream source during execution of stream, which causes Java to throw ConcurrentModificationException

Conclusion

You have learnt about parallel streams when to use parallel streams with examples. You should be careful while using parallel streams. Parallel streams are very powerful if used in the correct context.

That’s all about parallel stream in java.


import_contacts

You may also like:

Related Posts

  • Format Instant to String in java
    02 May

    How to format Instant to String in java

    Learn about how to format Instant to String in java.

  • Convert Date to LocalDate in java
    12 January

    Java Date to LocalDate

    Table of ContentsUsing toInstant() method of Date classUsing toInstant() method of Date classUsing java.sql.Date In this post, we will see how to convert Date to LocalDate in java. Sometimes, we may need to convert Date to new Java 8 APIs and vice versa. There are multiple ways to convert Date to LocalDate in java. Read […]

  • Convert LocalDate to Date in java
    11 January

    Java LocalDate to Date

    Table of ContentsUsing Instant objectUsing java.sql.Date In this post, we will see how to convert LocalDate to Date. Java 8 has introduced a lot of new APIs for Date and time. There can be many ways to convert Java LocalDateTime to date. Using Instant object You can convert LocalDate to Date using Instant object which […]

  • Convert Stream to List in java
    31 December

    Java Stream to List

    Table of ContentsUsing Collectors.toList()Using Collectors.toCollection()Using foreachFilter Stream and convert to ListConvert infinite Stream to List In this post, we will see how to convert Stream to List in java. There are multiple ways to convert Stream to List in java. Using Collectors.toList() You can pass Collectors.toList() to Stream.collect() method to convert Stream to List in […]

  • Convert LocalDateTime to Timestamp in java
    18 November

    Convert LocalDateTime to Timestamp in Java

    Table of ContentsLocalDateTimeTimestampConvert LocalDateTime to Timestamp In this post, we will how to convert LocalDateTime to Timestamp . Before learning how to convert localdatetime to timestamp, let us learn about LocalDateTime and Timestamp, and understand the importance of this conversion. LocalDateTime LocalDateTime was introcuded in Java 8. LocalDateTime can be imported time package: import java.time.LocalDateTime; […]

  • 28 August

    A In-Depth guide to Java 8 Stream API

    Table of ContentsIntroductionTypes of Stream operationsStream creationEmpty StreamCollection StreamStream.ofStream.generate()Stream.iterate()Lazy evaluationOrder of operationsPrimitive StreamsConvert Stream to IntStreamConvert IntStream to StreamEmployee classCommon intemediate operationsMap()Filter()sorted()limit()Skip()flatmap()Common terminal operationsforeachcollectReducecountallMatch()nonMatch()anyMatch()min()max()Parallel StreamsExercisesExercise 1Exercise 2Exercise 3Exercise 4Excercise 5Excercise 6Excercise 7 In this post, we will see an in-depth overview of Java 8 streams with a lot of examples and exercises. Introduction You may […]

Leave a Reply

Your email address will not be published. Required fields are marked *

Subscribe to our newletter

Get quality tutorials to your inbox. Subscribe now.