Java streams 25. Collect 1. Custom collector

Terminal operation either returns one value (of the same or another type than the type of the input) or does not return anything at all (produces just side effects). It does not allow another operation to be applied after it and closes the stream.

In this post, we will start discussing the last of the terminal operations called collect():

R collect(Collector<T,A,R> collector)

It is a specialization of reduce() operation. It allows implementing a vast variety of algorithms using the ready-to-use implementations of collectors from the java.util.stream.Collectors class. We discuss each of them in the subsequent posts.

You can also implement your own collector using the overloaded collect() method version:

R collect(Supplier<R> supplier, BiConsumer<R,T> accumulator, BiConsumer<R,R> combiner) 

That is what we are going to discuss today.



Custom collector vs class Collectors usage

To demonstrate, we will use the following class Box

  
    class Box {
      int weight;
      String color;
      public Box(){}
      public Box(int weight, String color) {
        this.weight = weight;
        this.color = color;
      }
      public int getWeight() { return weight; 
      public void setWeight(int weight) { this.weight = weight;}
      public String getColor() { return color; }
      public void setColor(String color) { this.color = color; }

      @Override
      public String toString() {
        return "Box{weight=" + weight +
                ", color='" + color + "'}";
      }
    }
    

Let us implement the collector that finds the heaviest of the boxes:

   
  BiConsumer<Box, Box> accumulator = (b1, b2) -> {
      if(b1.getWeight() < b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  BiConsumer<Box, Box> combiner = (b1, b2) -> {
      System.out.print("Combiner is called!");
      if(b1.getWeight() < b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  Box theHeaviest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
        .collect(Box::new, accumulator, combiner);
  System.out.print(theHeaviest);  
                      //prints: Box{weight=8, color='green'}
   

The result is correct, but the combiner was not called (the message “Combine is called!” was not printed). That is because the combiner is used only for parallel stream processing to combine results of several sub-processes executed in parallel.

To demonstrate it, let us convert our stream into a parallel one:

   
  Box theHeaviest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
      .parallel()
      .collect(Box::new, accumulator, combiner); 
            //prints: Combiner is called!Combiner is called!
  System.out.print(theHeaviest); 
                      //prints: Box{weight=8, color='green'}

As you can see, now the combiner was called twice (we have three elements, so two have to be combined with the first one).

If it feels a bit too much, we have good news for you. Chances are you will never need to create a custom collector because many of them can be generated by the factory methods of Collectors class or other utilities we will discuss later. We demo custom collector for two reasons:

— to let you know it is possible and how to do it in (very improbable) case you do not find ready-to-use one in Collectors class or other utilities;

— it helps to understand the terminology and how collectors work, which will help you to choose a ready-to-use one.

For example, to select the heaviest box, we could use the Collectors.maxBy() collector. (For the sake of full disclosure, we could also use the max() operation described in Java streams 22. FindAny, findFirst, max, min, but we need to make a point by using a collector generated by one of the Collectors class factory methods).

The following is an example of the Collectors.maxBy() collector usage:

   
  Box theHeaviest = Stream.of(new Box(5, "rhowed"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
      .collect(Collectors.maxBy(Comparator
                         .comparing(Box::getWeight)))
      .orElse(null);
  System.out.print(theHeaviest); 
                //prints: Box{weight=8, color='green'}

There is no guarantee that the max element will be found (if a stream is empty, for example), so the operation returns result wrapped inside an Optional object.

Please, notice that this implementation cannot be used for finding the max element in a parallel stream. That would be the reason for using the max() operation described in Java streams 22. FindAny, findFirst, max, min or for creating a custom collector. It also well may be that some of the utilities in Java JDK or the third-party library have a ready-to-use collector too.

The point is that, before implementing your own collector, try to find an existing one. It would probably be easier and more reliable implementation. 



A potential pothole while implementing the custom collector of the min value

At first glance, it seems that custom collector for finding the lightest box would look as follows – by just flipping the comparison in the accumulator and combiner:

  
  BiConsumer<Box, Box> accumulatorMin = (b1, b2) -> {
     if(b1.getWeight() > b2.getWeight()){
         b1.setWeight(b2.getWeight());
         b1.setColor(b2.getColor());
     }
  };
  BiConsumer<Box, Box> combinerMin = (b1, b2) -> {
     System.out.print("Combiner is called!");
     if(b1.getWeight() > b2.getWeight()){
           b1.setWeight(b2.getWeight());
           b1.setColor(b2.getColor());
     }
  };
  Box theLightest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
     .collect(Box::new, accumulatorMin, combinerMin);
  System.out.print(theLightest); 
                  //prints: Box{weight=0, color='null'}
    

It did not work! Why? Apparently, that is because the very first comparison is done between the new Box() object and one of the Box objects in the stream. The new Box() object has weight 0 and color null, by default, so none of the stream objects has a smaller weight, thus the result.

To prove it, let us change the Supplier (the first parameter) implementation to the following:

  
  Box theLightest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
     .collect(() -> new Box(10, "whatever"), 
                          accumulatorMin, combinerMin);
  System.out.print(theLightest);  
                 //prints: Box{weight=3, color='blue'}
     

Much better, isn’t it? The only problem is that it requires advanced knowledge of the biggest weight. We could set it to max int value and the solution would work.

But the following is a cleaner implementation:

  
  BiConsumer<Box, Box> accumulatorMin2 = (b1, b2) -> {
      if(b1.getWeight() == 0 || 
                  b1.getWeight() > b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  BiConsumer<Box, Box> combinerMin2 = (b1, b2) -> {
      if(b1.getWeight() == 0 || 
                b1.getWeight() > b2.getWeight()){
          b1.setWeight(b2.getWeight());
          b1.setColor(b2.getColor());
      }
  };
  Box theLightest = Stream.of(new Box(5, "red"),
                              new Box(8, "green"),
                              new Box(3, "blue"))
     .collect(Box::new, accumulatorMin2, combinerMin2); 
  System.out.print(theLightest);                  
                 //prints: Box{weight=3, color='blue'}

Notice how we have added check b1.getWeight() == 0 to the accumulator and combiner. This allowed us to revert back to new Box() as the value produced by the Supplier.

So, that is an example of a pitfall one can fall sometimes by assuming that the symmetrical solution can be achieved by just flipping the comparison or similar simple change.

To protect yourself from such traps, use unit testing. We will talk about testing, its benefits and drawbacks later.

And, finally, see how easier and less error-prone is the implementation of the same functionality when one uses a ready-to-use collector:

  
  Box theLightest = Stream.of(new Box(5, "red"),
                            new Box(8, "green"),
                            new Box(3, "blue"))
     .collect(Collectors.
          minBy(Comparator.comparing(Box::getWeight))) 
     .orElse(null);
  System.out.print(theLightest);                     
                 //prints: Box{weight=3, color='blue'}
    

As we have mentioned above, this implementation does not allow for parallel stream processing. But, in practice, most of the developers never encounter a situation when one needs to process a parallel stream.

In the next post, we will continue discussing the collect() operation and demonstrate usage of the ready-to-use Collectors.minBy() and Collectors.maxBy() collectors in more details.

See other posts on Java 8 streams and posts on other topics.
You can also use navigation pages for Java stream related blogs:
— Java 8 streams blog titles
— Create stream
— Stream operations
— Stream operation collect()
The source code of all the code examples is here in GitHub

, ,

Send your comments using the link Contact or in response to my newsletter.
If you do not receive the newsletter, subscribe via link Subscribe under Contact.

Powered by WordPress. Designed by Woo Themes