Java Parallel Stream

Java 8 Parallel Stream

In this post, we will see about Parallel Stream in java.

Java Parallel Stream introduction

Java 8 introduces the concept of parallel stream to do parallel processing. As we have more number of cpu cores nowadays due to cheap hardware costs, parallel processing can be used to perform operation faster.

Let’s understand with help of simple example

When you run above program, you will get below output

=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1

If you notice the output,main thread is doing all the work in case of sequential stream. It waits for current iteration to complete and then work on next iteration.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.Parallel streams create ForkJoinPool instance via static ForkJoinPool.commonPool() method.

Parallel Stream takes the benefits of all available CPU cores and processes the tasks in parallel. If number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.

Parallel Streams are cool, so should you use it always?

A big No!!
It is easy to convert sequential Stream to parallel Stream just by adding .parallel, does not mean you should always use it.
There are lots of factors you need to consider while using parallel streams otherwise you will suffer from negative impacts of parallel Streams.

Parallel Stream has much higher overhead than sequential Stream and it takes good amount of time to coordinate between threads.
You need to consider parallel Stream if and only if:

  • You have large dataset to process.
  • As you know that Java uses ForkJoinPool to achieve parallelism, ForkJoinPool forks sources stream and submit for execution, so your source stream should be splittable.
    For example:
    ArrayList is very easy to split, as we can find a middle element by its index and split it but LinkedList is very hard to split and does not perform very well in most of the cases.
  • You are actually suffering from performance issues.
  • You need to make sure that all the shared resources between threads need to be synchronized properly otherwise it might produce unexpected results.

Simplest formula for measuring parallelism is "NQ" model as provided by Brian Goetz in his presentation.

NQ Model:

N x Q > 10000

where,
N = number of items in dataset
Q = amount of work per item

It means if you have a large number of datasets and less work per item(For example: Sum), parallelism might help you run program faster and vice versa is also true. So if you have less number of datasets and more work per item(doing some computational work), then also parallelism might help you in achieving results faster.

Let’s see with the help of another example.

In this example, we are going to see how CPU behaves when you perform long computations in case of parallel Stream and sequential stream.We are doing some arbit calculations to make CPU busy.

When you run above program, you will get below output.

117612733
Time taken to complete:6 minutes

But we are not interested in output here, but how CPU behaved when above operation performed.

As you can see CPU is not fully utilized in case of Sequential Stream.

Let’s change at 16 line no. and make the stream parallel and run the program again.

You will get below output when you run Stream in parallel.

117612733
Time taken to complete:3 minutes

Let’s check CPU history when we ran program using parallel stream.

Parallel Stream

As you can see parallel stream used all 4 CPU cores to perform computation.

Custom Thread pool in Parallel Stream

The parallel stream by default uses ForkJoinPool.commonPool which has one less thread than number of processor. This means parallel stream uses all available processors because it uses the main thread as well.

In case, you are using multiple parallel streams, then they will share same ForkJoinPool.commonPool .This means you may not be able to use all the processors assigned to each parallel stream.

To solve this issue, you can create own thread pool while processing the stream.

This will create ForkJoinPool with target parallelism level. If you don’t pass parallelism, it will equal to the number of processors by default.

Now you can submit parallel stream to this custom ForkJoinPool.

Let’s understand with the help of example.

When you run the program, you will get below output:

================
Parallel stream 1
================
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 5
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-2,5,main] 1
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 3
Processing with Thread[main,5,main] 3
Sum: 23
================
Parallel stream 2
================
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 111
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 66
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 0
Processing with Thread[ForkJoinPool.commonPool-worker-1,5,main] 27
Processing with Thread[ForkJoinPool.commonPool-worker-3,5,main] 66
Processing with Thread[main,5,main] 66
Sum: 522
================
Parallel stream with custom thread pool 1
================
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 5
Processing with Thread[ForkJoinPool-1-worker-4,5,main] 0
Processing with Thread[ForkJoinPool-1-worker-1,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Processing with Thread[ForkJoinPool-1-worker-3,5,main] 3
Processing with Thread[ForkJoinPool-1-worker-2,5,main] 1
Sum: 23
================
Parallel stream with custom thread pool 2
================
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 111
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-3,5,main] 0
Processing with Thread[ForkJoinPool-2-worker-2,5,main] 27
Processing with Thread[ForkJoinPool-2-worker-0,5,main] 66
Processing with Thread[ForkJoinPool-2-worker-1,5,main] 27
Sum: 522

As you can see, first two parallel streams are using ForkJoinPool.commonPool and next 2 are using custom thread pools i.e. ForkJoinPool-1 and ForkJoinPool-2

Things you should keep in mind while using Parallel Stream

Stateful lambda expressions

You should avoid using stateful lambda expressions in stream operations.A Stateful lambda expressions is one whose output depends on any state that might change during execution of stream operations.

Output:

40 34 21 37 20
40 34 37 20 21

forEachOrdered processes the elements in order imposed bt stream. .map(e -> {syncList.add(e); return e;}) is stateful lambda and order in which .map(e -> {syncList.add(e); return e;}) adds element to
syncList can vary, so you should not use stateful lambda operations while using parallel stream.

Interference

Lambda expression in stream operation should not modify source of stream.
Following code tries to add element to list of integer and throw concurrentModification exception.

Output:

34 21 40 20 37 Exception in thread “main” java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at org.arpit.java2blog.ListOfIntegersStatefulLambda.main(ListOfIntegersStatefulLambda.java:19)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Please note that all the intermediate operations are lazy, execution of streams begins when foreach is invoked. As argument of peek tries to modifies stream source during execution of stream, which causes Java to throw ConcurrentModificationException

Conclusion

You have learnt about parallel streams when to use parallel streams with examples. You should be careful while using parallel streams. Parallel streams are very powerful if used in the correct context.

That’s all about parallel stream in java.


import_contacts

You may also like:

Related Posts

  • Unable to obtain LocalDateTime from TemporalAccessor
    20 February

    [Fixed] Unable to obtain LocalDateTime from TemporalAccessor

    Table of ContentsUnable to obtain LocalDateTime from TemporalAccessor : ReasonUnable to obtain LocalDateTime from TemporalAccessor : FixLocalDate’s parse() method with atStartOfDay()Use LocalDate instead of LocalDateTime In this article, we will see how to fix Unable to obtain LocalDateTime from TemporalAccessor in Java 8. Unable to obtain LocalDateTime from TemporalAccessor : Reason You will generally get […]

  • Java LocalDate to Instant
    17 February

    Convert LocalDate to Instant in Java

    Table of ContentsJava LocalDate to InstantUsing toInstant() wth ZoneIdUsing toInstant() with ZoneOffset In this article, we will see how to convert LocalDate to Instant in Java. Java LocalDate to Instant Instant class provides an instantaneous point in time. When you want to convert LocalDate to Instant, you need to provide time zone. Using toInstant() wth […]

  • Java Instant to LocalDate
    17 February

    Convert Instant to LocalDate in Java

    Table of ContentsUsing ofInstant method [ Java 9+]Using ZoneDateTime’s toLocalDate() [Java 8] In this article, we will see how to convert Instant to LocalDate in java. Using ofInstant method [ Java 9+] Java 9 has introduced static method ofInstant() method in LocalDate class. It takes Instant and ZoneId as input and returns LocalDate object. [crayon-6284e4be3ca36326632902/] […]

  • Convert String to LocalDateTime in Java
    17 February

    Convert String to LocalDateTime in Java

    Table of ContentsJava String to LocalDateTimeConvert String to LocalDateTime with custom format In this article, we will see how to convert String to LocalDateTime in Java. LocalDateTime class was introduced in Java 8. LocalDateTime represents local date and time without timezone information. It is represented in ISO 8601 format (yyyy-MM-ddTHH:mm:ss) by default. Java String to […]

  • Java LocalDateTIme to String
    16 February

    Format LocalDateTime to String in Java

    Table of ContentsJava LocalDateTime To StringConvert LocalDateTime to Time Zone ISO8601 StringParse String to LocalDateTime In this article, we will see how to format LocalDateTime to String in java. Java LocalDateTime To String To format LocalDateTime to String, we can create DateTimeFormatter and pass it to LocalDateTime’s format() method. [crayon-6284e4be3c0cd980704148/] Here are steps: Get LocalDateTime […]

  • Find duplicate elements in the Stream
    17 October

    Java 8 – Find duplicate elements in Stream

    Table of ContentsIntroductionUsing distinct()Using Collections.frequency()Using Collectors.toSet()Using Collectors.toMap()Using Collectors.groupingBy()Conclusion Introduction When working with a collection of elements in Java, it is very common to have duplicate elements, and Java provides different APIs that we can use to solve the problem. Java 8 Stream provides the functionality to perform aggregate operations on a collection, and one of […]

Leave a Reply

Your email address will not be published.

Subscribe to our newletter

Get quality tutorials to your inbox. Subscribe now.