Maintaining PriorityQueue Order with Java Streams

The tricky thing about working with PriorityQueues is that, ironically, they don’t always behave according to the PriorityQueue semantics.

PriorityQueue Traversal

If we have a look at PriorityQueue.iterator() documentation, we’ll see that, unintuitively, iterator() is not traversing the queue according to its priority order:

Returns an iterator over the elements in this queue. The iterator does not return the elements in any particular order.

The same behaviour can be noticed when trying to use Java Stream API for processing PriorityQueue’s elements using the instance obtained by the stream() method – the Stream instance depends on the Spliterator instance which doesn’t guarantee the desired traversal order.

PriorityQueue<String> queue = new PriorityQueue<>(comparing(String::length));
List<String> content = Arrays.asList("1", "333", "22", "55555", "4444");
queue.addAll(content);

assertThat(queue.stream())
  .containsExactlyElementsOf(content);

We can see that the insertion order gets preserved regardless of the fact that our queue was expected to be providing Strings according to their length.

Solution #1

We can use the poll() method to fetch elements from the queue according to the priority order.

So – let’s generate a Stream from consecutive elements, returned by the poll() method, using Stream.generate() method:

List<String> result = Stream.generate(queue::poll)
  .limit(queue.size())
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isEmpty();

The problem with this implementation is that it’s not concurrent-modification-friendly. Until Java 9 gets released, we can’t terminate the generated Stream dynamically so we need to rely on limiting the Stream size to the queue size – which is not perfect because this can change during an actual processing.

The crucial part of this implementation is that after consuming a Stream instance, we end up with a modified queue – the poll() method removes the polled element from the queue.

Eventually, this approach can be extracted to the separate utility method:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .limit(queue.size());
}

Java 9

Since Java 9, it’ll be possible to rewrite it in a concurrent-friendly manner:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .takeWhile(Objects::nonNull)
}

Solution #2

The second approach involves simply sorting the Stream instance using the same comparator that the queue uses. We need to remember that this will work as long as the queue was initialized using a custom comparator:

List<String> result = queue.stream()
  .sorted(queue.comparator())
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isNotEmpty();

This approach can be used with any Collection type (as long as we can get ahold of the right Comparator instance).

If we store Comparable objects in the queue and depend on their natural order, this becomes even simpler because we do not need to reach for the Comparator instance:

PriorityQueue<String> queue = new PriorityQueue<>();
queue.addAll(Arrays.asList("1", "333", "22", "55555", "4444"));

List<String> result = queue.stream()
  .sorted()
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isNotEmpty();

In this case, after consuming a Stream instance, our original queue remains intact.

Eventually, this approach can be extracted to the separate utility method:

static <T> Stream<T> asStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    Comparator<? super T> comparator = queue.comparator();
    return comparator != null
      ? queue.stream().sorted(comparator)
      : queue.stream().sorted();
}

Conclusion

The priority order of the PriorityQueue is not preserved when iterating/traversing so, essentially, we need to create our Stream instances ourselves.

The working code snippets can be found on GitHub.

You May Also Like

Sygnalizacyjne ABC

Poniższy artykuł oparty jest na wspaniałej pozycji książkowej “System Sygnalizacji nr 7 G. Danielewicz, W.Kabaciński”. Gorąco zachęcam do…