Maintaining PriorityQueue Order with Java Streams

The tricky thing about working with PriorityQueues is that, ironically, they don’t always behave according to the PriorityQueue semantics.

PriorityQueue Traversal

If we have a look at PriorityQueue.iterator() documentation, we’ll see that, unintuitively, iterator() is not traversing the queue according to its priority order:

Returns an iterator over the elements in this queue. The iterator does not return the elements in any particular order.

The same behaviour can be noticed when trying to use Java Stream API for processing PriorityQueue’s elements using the instance obtained by the stream() method – the Stream instance depends on the Spliterator instance which doesn’t guarantee the desired traversal order.

PriorityQueue<String> queue = new PriorityQueue<>(comparing(String::length));
List<String> content = Arrays.asList("1", "333", "22", "55555", "4444");
queue.addAll(content);

assertThat(queue.stream())
  .containsExactlyElementsOf(content);

We can see that the insertion order gets preserved regardless of the fact that our queue was expected to be providing Strings according to their length.

Solution #1

We can use the poll() method to fetch elements from the queue according to the priority order.

So – let’s generate a Stream from consecutive elements, returned by the poll() method, using Stream.generate() method:

List<String> result = Stream.generate(queue::poll)
  .limit(queue.size())
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isEmpty();

The problem with this implementation is that it’s not concurrent-modification-friendly. Until Java 9 gets released, we can’t terminate the generated Stream dynamically so we need to rely on limiting the Stream size to the queue size – which is not perfect because this can change during an actual processing.

The crucial part of this implementation is that after consuming a Stream instance, we end up with a modified queue – the poll() method removes the polled element from the queue.

Eventually, this approach can be extracted to the separate utility method:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .limit(queue.size());
}

Java 9

Since Java 9, it’ll be possible to rewrite it in a concurrent-friendly manner:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .takeWhile(Objects::nonNull)
}

Solution #2

The second approach involves simply sorting the Stream instance using the same comparator that the queue uses. We need to remember that this will work as long as the queue was initialized using a custom comparator:

List<String> result = queue.stream()
  .sorted(queue.comparator())
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isNotEmpty();

This approach can be used with any Collection type (as long as we can get ahold of the right Comparator instance).

If we store Comparable objects in the queue and depend on their natural order, this becomes even simpler because we do not need to reach for the Comparator instance:

PriorityQueue<String> queue = new PriorityQueue<>();
queue.addAll(Arrays.asList("1", "333", "22", "55555", "4444"));

List<String> result = queue.stream()
  .sorted()
  .collect(Collectors.toList());

assertThat(result)
  .containsExactly("1", "22", "333", "4444", "55555");
assertThat(queue)
  .isNotEmpty();

In this case, after consuming a Stream instance, our original queue remains intact.

Eventually, this approach can be extracted to the separate utility method:

static <T> Stream<T> asStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    Comparator<? super T> comparator = queue.comparator();
    return comparator != null
      ? queue.stream().sorted(comparator)
      : queue.stream().sorted();
}

Conclusion

The priority order of the PriorityQueue is not preserved when iterating/traversing so, essentially, we need to create our Stream instances ourselves.

The working code snippets can be found on GitHub.

You May Also Like

Sygnalizacyjne ABC

Poniższy artykuł oparty jest na wspaniałej pozycji książkowej “System Sygnalizacji nr 7 G. Danielewicz, W.Kabaciński”. Gorąco zachęcam do…

Inconsistent Dependency Injection to domains with Grails

I've encountered strange behavior with a domain class in my project: services that should be injected were null. I've became suspicious as why is that? Services are injected properly in other domain classes so why this one is different?

Constructors experiment

I've created an experiment. I've created empty LibraryService that should be injected and Book domain class like this:

class Book {
def libraryService

String author
String title
int pageCount

Book() {
println("Finished constructor Book()")
}

Book(String author) {
this()
this.@author = author
println("Finished constructor Book(String author)")
}

Book(String author, String title) {
super()
this.@author = author
this.@title = title
println("Finished constructor Book(String author, String title)")
}

Book(String author, String title, int pageCount) {
this.@author = author
this.@title = title
this.@pageCount = pageCount
println("Finished constructor Book(String author, String title, int pageCount)")
}

void logInjectedService() {
println(" Service libraryService is injected? -> $libraryService")
}
}
class LibraryService {
def serviceMethod() {
}
}

Book has 4 explicit constructors. I want to check which constructor is injecting dependecies. This is my method that constructs Book objects and I called it in controller:

class BookController {
def index() {
constructAndExamineBooks()
}

static constructAndExamineBooks() {
println("Started constructAndExamineBooks")
Book book1 = new Book().logInjectedService()
Book book2 = new Book("foo").logInjectedService()
Book book3 = new Book("foo", 'bar').logInjectedService()
Book book4 = new Book("foo", 'bar', 100).logInjectedService()
Book book5 = new Book(author: "foo", title: 'bar')
println("Finished constructor Book(Map params)")
book5.logInjectedService()
}
}

Analysis

Output looks like this:

Started constructAndExamineBooks
Finished constructor Book()
Service libraryService is injected? -> eu.spoonman.refaktor.LibraryService@2affcce2
Finished constructor Book()
Finished constructor Book(String author)
Service libraryService is injected? -> eu.spoonman.refaktor.LibraryService@2affcce2
Finished constructor Book(String author, String title)
Service libraryService is injected? -> null
Finished constructor Book(String author, String title, int pageCount)
Service libraryService is injected? -> null
Finished constructor Book()
Finished constructor Book(Map params)
Service libraryService is injected? -> eu.spoonman.refaktor.LibraryService@2affcce2

What do we see?

  1. Empty constructor injects dependencies.
  2. Constructor that invokes empty constructor explicitly injects dependencies.
  3. Constructor that invokes parent's constructor explicitly does not inject dependencies.
  4. Constructor without any explicit call declared does not call empty constructor thus it does not inject dependencies.
  5. Constructor provied by Grails with a map as a parameter invokes empty constructor and injects dependencies.

Conclusion

Always explicitily invoke empty constructor in your Grail domain classes to ensure Dependency Injection! I didn't know until today either!

Me on Hadoop on Parleys

Finally I've managed to import my WarJUG presentation to parleys.com. See for yourself :) If you've got problems with opening the parleys' version try the ones uploaded to youtube. Here is part 1: And here is part 2: Finally I've managed to import my WarJUG presentation to parleys.com. See for yourself :) If you've got problems with opening the parleys' version try the ones uploaded to youtube. Here is part 1: And here is part 2: