Maintaining PriorityQueue Order with Java Streams

The tricky thing about working with PriorityQueues is that, ironically, they don’t always behave according to the PriorityQueue semantics.

PriorityQueue Traversal

If we have a look at PriorityQueue.iterator() documentation, we’ll see that, unintuitively, iterator() is not traversing the queue according to its priority order:

Returns an iterator over the elements in this queue. The iterator does not return the elements in any particular order.

The same behaviour can be noticed when trying to use Java Stream API for processing PriorityQueue’s elements using the instance obtained by the stream() method – the Stream instance depends on the Spliterator instance which doesn’t guarantee the desired traversal order.

PriorityQueue<String> queue = new PriorityQueue<>(comparing(String::length));
List<String> content = Arrays.asList("1", "333", "22", "55555", "4444");
queue.addAll(content);

assertThat(queue.stream())
  .containsExactlyElementsOf(content);

We can see that the insertion order gets preserved regardless of the fact that our queue was expected to be providing Strings according to their length.

Solution #1

We can use the poll() method to fetch elements from the queue according to the priority order.

So – let’s generate a Stream from consecutive elements, returned by the poll() method, using Stream.generate() method:

List<String> result = Stream.generate(queue::poll)
  .limit(queue.size())
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isEmpty();

The problem with this implementation is that it’s not concurrent-modification-friendly. Until Java 9 gets released, we can’t terminate the generated Stream dynamically so we need to rely on limiting the Stream size to the queue size – which is not perfect because this can change during an actual processing.

The crucial part of this implementation is that after consuming a Stream instance, we end up with a modified queue – the poll() method removes the polled element from the queue.

Eventually, this approach can be extracted to the separate utility method:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .limit(queue.size());
}

Java 9

Since Java 9, it’ll be possible to rewrite it in a concurrent-friendly manner:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .takeWhile(Objects::nonNull)
}

Solution #2

The second approach involves simply sorting the Stream instance using the same comparator that the queue uses. We need to remember that this will work as long as the queue was initialized using a custom comparator:

List<String> result = queue.stream()
  .sorted(queue.comparator())
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isNotEmpty();

This approach can be used with any Collection type (as long as we can get ahold of the right Comparator instance).

If we store Comparable objects in the queue and depend on their natural order, this becomes even simpler because we do not need to reach for the Comparator instance:

PriorityQueue<String> queue = new PriorityQueue<>();
queue.addAll(Arrays.asList("1", "333", "22", "55555", "4444"));

List<String> result = queue.stream()
  .sorted()
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isNotEmpty();

In this case, after consuming a Stream instance, our original queue remains intact.

Eventually, this approach can be extracted to the separate utility method:

static <T> Stream<T> asStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    Comparator<? super T> comparator = queue.comparator();
    return comparator != null
      ? queue.stream().sorted(comparator)
      : queue.stream().sorted();
}

Conclusion

The priority order of the PriorityQueue is not preserved when iterating/traversing so, essentially, we need to create our Stream instances ourselves.

The working code snippets can be found on GitHub.

You May Also Like

Devoxx 2012 review


I'm sitting in a train to Charleroi, looking through a window at the Denmark landscape, street lights flashing by, people comming home from work, getting out for a Friday night party, or having a family dinner. To my left, guys from SoftwareMill are playing cards.
I don't really see them. My mind is busy elsewhere, sorting out and processing last two days in Antwerp, where 3400 developers, from 41 different countries, listened to 200 different sessions at the Devoxx, AFAIK the biggest Java conference this year.

Grails render as JSON catch

One of a reasons your controller doesn't render a proper response in JSON format might be wrong package name that you use. It is easy to overlook. Import are on top of a file, you look at your code and everything seems to be fine. Except response is still not in JSON format.

Consider this simple controller:

class RestJsonCatchController {
def grailsJson() {
render([first: 'foo', second: 5] as grails.converters.JSON)
}

def netSfJson() {
render([first: 'foo', second: 5] as net.sf.json.JSON)
}
}

And now, with finger crossed... We have a winner!

$ curl localhost:8080/example/restJsonCatch/grailsJson
{"first":"foo","second":5}
$ curl localhost:8080/example/restJsonCatch/netSfJson
{first=foo, second=5}

As you can see only grails.converters.JSON converts your response to JSON format. There is no such converter for net.sf.json.JSON, so Grails has no converter to apply and it renders Map normally.

Conclusion: always carefully look at your imports if you're working with JSON in Grails!

Edit: Burt suggested that this is a bug. I've submitted JIRA issue here: GRAILS-9622 render as class that is not a codec should throw exception