Idiomatic Peeking with Java Stream API

The peek() method from the Java Stream API is often misunderstood. Let’s try to clarify how it works and when should be used.

Stream

 

Introduction

Let’s start responsibly by RTFM inspecting peek()’s user’s manual.

Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.
Sounds pretty straightforward. We can use it for applying side-effects for every consumed Stream element:

Stream.of("one", "two", "three")
  .peek(e -> System.out.println(e + " was consumed by peek()."))
  .collect(Collectors.toList());

The result of such operation is not surprising:

one was consumed by peek().
two was consumed by peek().
three was consumed by peek().

Stream/peek Lazy Evaluation

but… what will happen if we replace the terminal collect() operation with a forEach()?

Stream.of("one", "two", "three")
  .peek(e -> System.out.println(e + " was consumed by peek()."))
  .forEach(System.out::println);

It might be tempting to think that we’ll see a series of peek() logs followed by a series of for-each logs but this is not the case:

one was consumed by peek().
one
two was consumed by peek().
two
three was consumed by peek().
three

It gets even more interesting if we get rid of the forEach call:

Stream.of("one", "two", "three")
  .peek(e -> System.out.println(e + " was consumed by peek()."));

the result would be nothing:


As JavaDoc states:

Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
So, because of the lazy evaluation Stream pipelines are always being traversed “vertically” and not “horizontally” which allows to avoid doing unnecessary calculations. Additionally, the traversal is triggered only when a terminal method is present. Hence, the observed behaviour.

This is why it’s possible to represent and manipulate infinite sequences using Streams. (Because where do you store an infinite sequence? In the cloud?):

Stream.iterate(0, i -> i + 1)
  .peek(System.out::println)
  .findFirst();

The above operation completes almost immediately because of the lazy character of Stream traversal. Of course, if you try to collect the whole infinite sequence to some data structure, even laziness will not save you.

So, we can see that peek() can’t be treated as an intermediate for-each replacement because it invokes the passed Consumer only on elements that are visited by the Stream.

Unfortunately, Streams do not always behave entirely lazily.

Proper Usage

Further inspection of the official docs reveals a note:

This method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline
The point of confusion is the “mainly” word. Let’s have a peek(pun intended) at the English dictionary:

We can see that non-debugging usages are not forbidden nor discouraged.

So, technically, we should be able to, e.g., modify the Stream elements on the fly which would not be possible in the immutable, functional world. Let’s use the infamous mutable java.util.Date:

Stream.of(Date.from(Instant.EPOCH))
  .peek(d -> d.setTime(Long.MAX_VALUE))
  .forEach(System.out::println);

// Sun Aug 17 08:12:55 CET 292278994

and we can observe that the result is far away from the standard epoch, which means the mutating operation was indeed applied.

The problem is that this behaviour is highly deceiving because certain Stream implementations can optimize out peek() calls.

This gets clarified in the early draft of JDK 9’s docs eventually:

In cases where the stream implementation is able to optimize away the production of some or all the elements (such as with short-circuiting operations like findFirst, or in the example described in count()), the action will not be invoked for those elements. (…) An implementation may choose to not execute the stream pipeline (either sequentially or in parallel) if it is capable of computing the count directly from the stream source.
So, now it’s clear that it might not be the best choice if we want to perform some side effects deterministically.

According to this, peek() might not be even that reliable debugging tool after all.

Key Takeaways

  • The peek() method works fine as a debugging tool when we want to see what is being consumed by a Stream
  • It seems to work fine when applying mutating operations but should not be used this way because this behaviour is non-deterministic due to the possibility of certain peek() calls being omitted due to internal optimization
  • The discussion, whether mutation operations should be allowed or not, would never take place if we were restricted to operate only on immutable values
  • We have around 292276977 years before we run out of java.util.Date range
You May Also Like

Clojure web development – state of the art

It’s now more than a year that I’m getting familiar with Clojure and the more I dive into it, the more it becomes the language. Once you defeat the “parentheses fear”, everything else just makes the difference: tooling, community, good engineering practices. So it’s now time for me to convince others. In this post I’ll try to walktrough a simple web application from scratch to show key tools and libraries used to develop with Clojure in late 2015.

Note for Clojurians: This material is rather elementary and may be useful for you if you already know Clojure a bit but never did anything bigger than hello world application.

Note for Java developers: This material shows how to replace Spring, Angular, grunt, live-reload with a bunch of Clojure tools and libraries and a bit of code.

The repo with final code and individual steps is here.

Bootstrap

I think all agreed that component is the industry standard for managing lifecycle of Clojure applications. If you are a Java developer you may think of it as a Spring (DI) replacement - you declare dependencies between “components” which are resolved on “system” startup. So you just say “my component needs a repository/database pool” and component library “injects” it for you.

To keep things simple I like to start with duct web app template. It’s a nice starter component application following the 12-factor philosophy. So let’s start with it:

lein new duct clojure-web-app +example

The +example parameter tells duct to create an example endpoint with HTTP routes - this would be helpful. To finish bootstraping run lein setup inside clojure-web-app directory.

Ok, let’s dive into the code. Component and injection related code should be in system.clj file:

(defn new-system [config]
  (let [config (meta-merge base-config config)]
    (-> (component/system-map
         :app  (handler-component (:app config))
         :http (jetty-server (:http config))
         :example (endpoint-component example-endpoint))
        (component/system-using
         {:http [:app]
          :app  [:example]
          :example []}))))

In the first section you instantiate components without dependencies, which are resolved in the second section. So in this example, “http” component (server) requires “app” (application abstraction), which in turn is injected with “example” (actual routes). If your component needs others, you just can get then by names (precisely: by Clojure keywords).

To start the system you must fire a REPL - interactive environment running within context of your application:

lein repl

After seeing prompt type (go). Application should start, you can visit http://localhost:3000 to see some example page.

A huge benefit of using component approach is that you get fully reloadable application. When you change literally anything - configuration, endpoints, implementation, you can just type (reset) in REPL and your application is up-to-date with the code. It’s a feature of the language, no JRebel, Spring-reloaded needed.

Adding REST endpoint

Ok, in the next step let’s add some basic REST endpoint returning JSON. We need to add 2 dependencies in project.clj file:

:dependencies
 ...
  [ring/ring-json "0.3.1"]
  [cheshire "5.1.1"]

Ring-json adds support for JSON for your routes (in ring it’s called middleware) and cheshire is Clojure JSON parser (like Jackson in Java). Modifying project dependencies if one of the few tasks that require restarting the REPL, so hit CTRL-C and type lein repl again.

To configure JSON middleware we have to add wrap-json-body and wrap-json-response just before wrap-defaults in system.clj:

(:require 
 ...
 [ring.middleware.json :refer [wrap-json-body wrap-json-response]])

(def base-config
   {:app {:middleware [[wrap-not-found :not-found]
                      [wrap-json-body {:keywords? true}]
                      [wrap-json-response]
                      [wrap-defaults :defaults]]

And finally, in endpoint/example.clj we must add some route with JSON response:

(:require 
 ...
 [ring.util.response :refer [response]]))

(defn example-endpoint [config]
  (routes
    (GET "/hello" [] (response {:hello "world"}))
    ...

Reload app with (reset) in REPL and test new route with curl:

curl -v http://localhost:3000/hello

< HTTP/1.1 200 OK
< Date: Tue, 15 Sep 2015 21:17:37 GMT
< Content-Type: application/json; charset=utf-8
< Set-Cookie: ring-session=37c337fb-6bbc-4e65-a060-1997718d03e0;Path=/;HttpOnly
< X-XSS-Protection: 1; mode=block
< X-Frame-Options: SAMEORIGIN
< X-Content-Type-Options: nosniff
< Content-Length: 151
* Server Jetty(9.2.10.v20150310) is not blacklisted
< Server: Jetty(9.2.10.v20150310)
<
* Connection #0 to host localhost left intact
{"hello": "world"}

It works! In case of any problems you can find working version in this commit.

Adding frontend with figwheel

Coding backend in Clojure is great, but what about the frontend? As you may already know, Clojure could be compiled not only to JVM bytecode, but also to Javascript. This may sound familiar if you used e.g. Coffescript. But ClojureScript philosophy is not only to provide some syntax sugar, but improve your development cycle with great tooling and fully interactive development. Let’s see how to achieve it.

The best way to introduce ClojureScript to a project is figweel. First let’s add fighweel plugin and configuration to project.clj:

:plugins
   ...
   [lein-figwheel "0.3.9"]

And cljsbuild configuration:

:cljsbuild
    {:builds [{:id "dev"
               :source-paths ["src-cljs"]
               :figwheel true
               :compiler {:main       "clojure-web-app.core"
                          :asset-path "js/out"
                          :output-to  "resources/public/js/clojure-web-app.js"
                          :output-dir "resources/public/js/out"}}]}

In short this tells ClojureScript compiler to take sources from src-cljs with figweel support and but resulting JavaScript into resources/public/js/clojure-web-app.js file. So we need to include this file in a simple HTML page:

<!DOCTYPE html>
<head>
</head>
<body>
  <div id="main">
  </div>
  <script src="js/clojure-web-app.js" type="text/javascript"></script>
</body>
</html>

To serve this static file we need to change some defaults and add corresponding route. In system.clj change api-defaults to site-defaults both in require section and base-config function. In example.clj add following route:

(GET "/" [] (io/resource "public/index.html")

Again (reset) in REPL window should reload everything.

But where is our ClojureScript source file? Let’s create file core.cljs in src-cljs/clojure-web-app directory:

(ns ^:figwheel-always clojure-web-app.core)

(enable-console-print!)

(println "hello from clojurescript")

Open another terminal and run lein fighweel. It should compile ClojureScript and print ‘Prompt will show when figwheel connects to your application’. Open http://localhost:3000. Fighweel window should prompt:

To quit, type: :cljs/quit
cljs.user=>

Type (js/alert "hello"). Boom! If everything worked you should see and alert in your browser. Open developers console in your browser. You should see hello from clojurescript printed on the console. Change it in core.cljs to (println "fighweel rocks") and save the file. Without reloading the page your should see updated message. Figweel rocks! Again, in case of any problems, refer to this commit.

In the next post I’ll show how to fetch data from MongoDB, serve it with REST to the broser and write ReactJs/Om components to render it. Stay tuned!

OVal – validate your models quickly and effortlessly!

Some time ago one of the projects at work required me to validate some Java POJOs. Theses were my model classes and I've been creating them from incoming WebService requests. One would say that XSD would be sufficient for the task, for parts of this va...Some time ago one of the projects at work required me to validate some Java POJOs. Theses were my model classes and I've been creating them from incoming WebService requests. One would say that XSD would be sufficient for the task, for parts of this va...