Idiomatic Peeking with Java Stream API

The peek() method from the Java Stream API is often misunderstood. Let’s try to clarify how it works and when should be used.

Stream

 

Introduction

Let’s start responsibly by RTFM inspecting peek()’s user’s manual.

Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.
Sounds pretty straightforward. We can use it for applying side-effects for every consumed Stream element:

Stream.of("one", "two", "three")
  .peek(e -> System.out.println(e + " was consumed by peek()."))
  .collect(Collectors.toList());

The result of such operation is not surprising:

one was consumed by peek().
two was consumed by peek().
three was consumed by peek().

Stream/peek Lazy Evaluation

but… what will happen if we replace the terminal collect() operation with a forEach()?

Stream.of("one", "two", "three")
  .peek(e -> System.out.println(e + " was consumed by peek()."))
  .forEach(System.out::println);

It might be tempting to think that we’ll see a series of peek() logs followed by a series of for-each logs but this is not the case:

one was consumed by peek().
one
two was consumed by peek().
two
three was consumed by peek().
three

It gets even more interesting if we get rid of the forEach call:

Stream.of("one", "two", "three")
  .peek(e -> System.out.println(e + " was consumed by peek()."));

the result would be nothing:


As JavaDoc states:

Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
So, because of the lazy evaluation Stream pipelines are always being traversed “vertically” and not “horizontally” which allows to avoid doing unnecessary calculations. Additionally, the traversal is triggered only when a terminal method is present. Hence, the observed behaviour.

This is why it’s possible to represent and manipulate infinite sequences using Streams. (Because where do you store an infinite sequence? In the cloud?):

Stream.iterate(0, i -> i + 1)
  .peek(System.out::println)
  .findFirst();

The above operation completes almost immediately because of the lazy character of Stream traversal. Of course, if you try to collect the whole infinite sequence to some data structure, even laziness will not save you.

So, we can see that peek() can’t be treated as an intermediate for-each replacement because it invokes the passed Consumer only on elements that are visited by the Stream.

Unfortunately, Streams do not always behave entirely lazily.

Proper Usage

Further inspection of the official docs reveals a note:

This method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline
The point of confusion is the “mainly” word. Let’s have a peek(pun intended) at the English dictionary:

We can see that non-debugging usages are not forbidden nor discouraged.

So, technically, we should be able to, e.g., modify the Stream elements on the fly which would not be possible in the immutable, functional world. Let’s use the infamous mutable java.util.Date:

Stream.of(Date.from(Instant.EPOCH))
  .peek(d -> d.setTime(Long.MAX_VALUE))
  .forEach(System.out::println);

// Sun Aug 17 08:12:55 CET 292278994

and we can observe that the result is far away from the standard epoch, which means the mutating operation was indeed applied.

The problem is that this behaviour is highly deceiving because certain Stream implementations can optimize out peek() calls.

This gets clarified in the early draft of JDK 9’s docs eventually:

In cases where the stream implementation is able to optimize away the production of some or all the elements (such as with short-circuiting operations like findFirst, or in the example described in count()), the action will not be invoked for those elements. (…) An implementation may choose to not execute the stream pipeline (either sequentially or in parallel) if it is capable of computing the count directly from the stream source.
So, now it’s clear that it might not be the best choice if we want to perform some side effects deterministically.

According to this, peek() might not be even that reliable debugging tool after all.

Key Takeaways

  • The peek() method works fine as a debugging tool when we want to see what is being consumed by a Stream
  • It seems to work fine when applying mutating operations but should not be used this way because this behaviour is non-deterministic due to the possibility of certain peek() calls being omitted due to internal optimization
  • The discussion, whether mutation operations should be allowed or not, would never take place if we were restricted to operate only on immutable values
  • We have around 292276977 years before we run out of java.util.Date range
You May Also Like

Sample for lift-ng: Micro-burn 1.0.0 released

During a last few evenings in my free time I've worked on mini-application called micro-burn. The idea of it appear from work with Agile Jira in our commercial project. This is a great tool for agile projects management. It has inline tasks edition, drag & drop board, reports and many more, but it also have a few drawbacks that turn down our team motivation.

Motivation

From time to time our sprints scope is changing. It is not a big deal because we are trying to be agile :-) but Jira's burndowchart in this situation draw a peek. Because in fact that chart shows scope changes not a real burndown. It means, that chart cannot break down an x-axis if we really do more than we were planned – it always stop on at most zero.

Also for better progress monitoring we've started to split our user stories to technical tasks and estimating them. Original burndowchart doesn't show points from technical tasks. I can find motivation of this – user story almost finished isn't finished at all until user can use it. But in the other hand, if we know which tasks is problematic we can do some teamwork to move it on.

So I realize that it is a good opportunity to try some new approaches and tools.

Tools

I've started with lift framework. In the World of Single Page Applications, this framework has more than simple interface for serving REST services. It comes with awesome Comet support. Comet is a replacement for WebSockets that run on all browsers. It supports long polling and transparent fallback to short polling if limit of client connections exceed. In backend you can handle pushes in CometActor. For further reading take a look at Roundtrip promises

But lift framework is also a kind of framework of frameworks. You can handle own abstraction of CometActors and push to client javascript that shorten up your way from server to client. So it was the trigger for author of lift-ng to make a lift with Angular integration that is build on top of lift. It provides AngularActors from which you can emit/broadcast events to scope of controller. NgModelBinders that synchronize your backend model with client scope in a few lines! I've used them to send project state (all sprints and thier details) to client and notify him about scrum board changes. My actor doing all of this hard work looks pretty small:

Lift-ng also provides factories for creating of Angular services. Services could respond with futures that are transformed to Angular promises in-fly. This is all what was need to serve sprint history:

And on the client side - use of service:


In my opinion this two frameworks gives a huge boost in developing of web applications. You have the power of strongly typing with Scala, you can design your domain on Actors and all of this with simplicity of node.js – lack of json trasforming boilerplate and dynamic application reload.

DDD + Event Sourcing

I've also tried a few fresh approaches to DDD. I've organize domain objects in actors. There are SprintActors with encapsulate sprint aggregate root. Task changes are stored as events which are computed as a difference between two boards states. When it should be provided a history of sprint, next board states are computed from initial state and sequence of events. So I realize that the best way to keep this kind of event sourcing approach tested is to make random tests. This is a test doing random changes at board, calculating events and checking if initial state + events is equals to previously created state:



First look

Screenshot of first version:


If you want to look at this closer, check the source code or download ready to run fatjar on github.During a last few evenings in my free time I've worked on mini-application called micro-burn. The idea of it appear from work with Agile Jira in our commercial project. This is a great tool for agile projects management. It has inline tasks edition, drag & drop board, reports and many more, but it also have a few drawbacks that turn down our team motivation.