JavaOne – Streams in JDK 8

“Streams in Java 8 – The good, the bad & the ugly”

Speaker: Simon Ritter & Stuart Marks
[Simon has the Twtter handle @speakjava; very cool]

For more blog posts from JavaOne, see the table of contents


Need to think differently. We are used to imperative programming with loops and variables.

Dealing wih exceptions
ugly code – three lines of code and hard to tell what it does

Problems:

  • looks like Perl
  • returns null (vs Optional or empty string)
  • split is called twice so wasted work
  • skipped URLDecoder.decode() because didn’t want to deal with a checked exception – but lost functionality. Problem caused by a missing API in Java so have to use decode.

Better approach:

  • use a method with a try/catch block; call that method from the stream
  • use Map.entry to simulate a tuple
  • Use single char (vs regex) in split. If only pass one character to split, far faster
  • split() is overloaded to take a numeric limit to how many are returned

Imperative streams
inside the for each is a print, and if statement and a LongAdder variable (good for frequent writes and infrequent reads)

then refactored to use mapToInt, a println and an if statement and a local variable. more complicated and still not functional

then switched to peek and no variabe but still an if statement (well a ternary)

finally switched to use a filter and count instead of sum

still not 100% functional because println is a side effect. ok for debugging

[good showing evolution to get functional]

Problems

  • Easy to misuse forEach() because feels familiar. But easily leads to side effects
  • Imperative thinking “for each of these I want to..”
  • Pause to consider if should use for each

Mixing internal and external iteration
for loop running 12 times and then getting data for each month with filter checking Month.of(x) – doesn’t work because x isn’t effectively final

“solve” effectively final by setting to different interim variable

IntStream.range(0,12).forEach – uses internal iteration but forEach. Marginaly better as don’t need interim variable

Instead return a nested map of Month to Map with nested grouping by so only need one iteration – the data stream

Problems

  • Going through data.stream() 12 times
  • forEach cheat
  • array not right data structure; it’s really a map of month to value

Hands on lab question
reduce (“”, (a,b) -> a+b) – works but inefficient because String concatenation

reduce(a,b) -> sb.append(b) – fails because ignores the first letter.

next attempt uses an if statement in reduce

then tried a custom collector. works but more complicated than necessary
Collector.of(StringBuilder::new, StringBuilder::append, StringBuilder::append, StringBuilder::toString

or just use Collectors.joining()

Problems

  • If not using a parameter, it is probably wrong
  • Side effects
  • if stateent version not associative so would fail when run in parallel

Misc

  • can’t use same stream multiple times
  • method references are slightly more efficient than lambdas because lambda gets added into a method in bytecode. Saves a level of indirection by using method reference. But only slightly
  • Calling .sorted() multiple times vs chaining comparing.thenComparing – the later is better [also works because preserves sort :)]
  • parallel streams do more work. might or might not complete faster. uses fork-join pool. number of threads defaults to number of CPUs. In Java9, this is # CPUs for container. On Jaa 8, it was for physical machine
  • Nested parallel streams is bad idea because using same threads so performance is worse. Can create ForkJoinPool if must. Buyer beware; this is an implementation specific behavior and tied to the profile of the machine you write it for.

My take: Fun start to he morning. I like that they covered common things in an entertaining way and not common things. Something to learn for everyone!

refactoring legacy code for java 8

I asked a teammate at CodeRanch if I could use a method he wrote to show how it could be written more clearly/succinctly in Java 8. He said yes. This is the original method. It is 95 lines and has a cyclomatic complexity of 27.

public static Map<Category, List<Forum>> getBetaTopForums(BestTopicsDAO bestTopicDAO, int userId, List<Topic> bestThisYear, List<Topic> secondaryBestList){
        Map<Integer, Integer> ratings = bestTopicDAO.selectForumRatingsByUserId(userId);
        
        List<Integer> forumIds = new ArrayList<>();
        List<Integer> ratedForumIds = CollectionCommon.getKeysSortedByValues(ratings, true);
        final int defaultUserId = SystemGlobals.getIntValue(ConfigKeys.BEST_TOPICS_DEFAULT_USER_ID);
        
        // Let's first add the forums that are rated above 5/10
        for(Integer forumId: ratedForumIds){
            Integer r = ratings.get(forumId);
            if(r != null && r > 5){
                forumIds.add(forumId);
            }
        }
        
        // Now let's add the forums found in best topics.
        if(bestThisYear != null){
            for(Topic topic: bestThisYear){
                int forumId = topic.getForumId();
                if(!forumIds.contains(forumId)){
                    forumIds.add(forumId);
                }
            }
        }
        if(secondaryBestList != null){
            for(Topic topic: secondaryBestList){
                int forumId = topic.getForumId();
                if(!forumIds.contains(forumId)){
                    forumIds.add(forumId);
                }
            }
        }
        
        // If this user is not the default-rating-user, let's add the top forums
        // rated by the default-rating-user.
        if(userId != defaultUserId) {
            Collection<Integer> fids = getTopRatedForumIds(bestTopicDAO);
            for(Integer fid: fids) {
                if(fid != null && !forumIds.contains(fid)) {
                    forumIds.add(fid);
                }
            }
        }
        
        
        // Let's also add some forums found in hot topics.
        List<Topic> tmpTopics = TopicRepository.getHottestTopics();
        for(Topic topic: tmpTopics){
            int forumId = topic.getForumId();
            if(!forumIds.contains(forumId)){
                forumIds.add(forumId);
            }
        }
        
        // Finally, let's add the forums that were rated below 6/10
        for(Integer forumId: ratedForumIds){
            Integer r = ratings.get(forumId);
            if(r != null && r < 6 && !forumIds.contains(forumId)){
                forumIds.add(forumId);
            }
        }
        
        Map<Category, List<Forum>> forums = new LinkedHashMap<>();
        
        PermissionControl pc = SecurityRepository.get(userId);
        
        final int maxTopForums = SystemGlobals.getIntValue(ConfigKeys.BETA_VIEW_MAX_TOP_FORUMS_IN_HOME);
        int forumsToShow = 0;
        
        // We need to remove the forums that are not visible, or rated 0/10
        for(Integer forumId: forumIds){
            if(pc.canAccess(SecurityConstants.PERM_FORUM, String.valueOf(forumId))){
                Forum f = ForumRepository.getForum(forumId);
                Category c = f != null ? ForumRepository.getCategory(f.getCategoryId()) : null;
                if(f != null && c != null && f.isShowInForumList()){
                    
                    if(++forumsToShow > maxTopForums){
                        break;
                    }
                    
                    List<Forum> fs;
                    if(forums.containsKey(c)){
                        fs = forums.get(c);
                    }else{
                        fs = new ArrayList<>();
                        forums.put(c, fs);
                    }
                    fs.add(f);
                }
            }
        }
        return forums;
        
    }

First step – write unit tests

First, I wrote unit tests for the existing code to ensure I didn’t break anything. While doing that, I did some minor refactorings. (I didn’t do extract method to make the method shorter since I knew I’d be updating that later). What I did change:

  1. Removed BestTopicsDAO as a parameter. It comes from a factory and we have a mock framework setup for that factory already. So there was no reason it had to be a parameter
  2. Renamed bestThisYear to primaryBestList. (It doesn’t always represent the best for the year)

I couldn’t get 100% test coverage because a few of the null checks were unreachable due to early logic (in the helper methods.) I left them in to see the effect they’d have on conversion.

Refactoring to Java 8 – Getting started

There’s a lot of for loops here. And I suspect they are very similar. Let’s start with the first one:

for(Integer forumId: ratedForumIds){
  Integer r = ratings.get(forumId);
  if(r != null && r > 5){
    forumIds.add(forumId);
  }
}

I converted this to a helper method and a stream:

forumIds.addAll(ratedForumIds.stream().filter(id -> {
  Integer r = ratings.get(id);
  return r != null && r > 5;
}).collect(Collectors.toList()));

I’ll grant you that this code isn’t shorter than the original code. It is more similar in form to the code I expect to write next though.  Also, it’s not easier to read given I didn’t extract the two line lambda. I’m going to do that after I get rid of the similar for loops.

The next refactoring

The next one is interesting. It’s just a for loop and if statement which is easy to rewrite.

for(Topic topic: primaryBestList){
  int forumId = topic.getForumId();
  if(!forumIds.contains(forumId)){
    forumIds.add(forumId);
  }
}

which is equivalent to

forumIds.addAll(primaryBestList.stream()
  .map(Topic::getForumId)
  .filter(f -> ! forumIds.contains(f))
  .collect(Collectors.toList()));

However, it is interesting because it check if the forum is already in forumIds. This shouldn’t be necessary as it is something a data structure could do. We need a data structure that is both a list (preserves order) and a set (checks for uniqueness.) Enter LinkedHashSet. It’s a set and preserves the insertion order. The JavaDoc even specifies that if you add the same element again, the order doesn’t change. Perfect. Switching forumIds to a LinkedHashSet allows for getting rid of the filter intermediate operation. And the unit tests still work.  I used the same techniques for the next five for loops.

Time to use methods

I converted two methods that had to do with ratings and three that were straight conversions.  I could have one common method they all use with identities for the filter/map that don’t apply. Two methods would be clearer. Giving me:

private static void addForumIds(Collection<Integer> forumIds, List<Topic> candidates) {
  if (candidates != null) {
    List<Integer> toAdd = candidates.stream().map(Topic::getForumId).collect(Collectors.toList());
    forumIds.addAll(toAdd);
  }
}

private static void addForumIds(Collection<Integer> forumIds, Collection<Integer> candidates, Predicate<Integer> predicate) {
  if (candidates != null) {
    List<Integer> toAdd = candidates.stream().filter(predicate).collect(Collectors.toList());
    forumIds.addAll(toAdd);
  }
}

The last one

The last chunk of code needed more refactoring to take care of all the forum filtering logic first. It also needed a mutable object for the counter rather than a primitive so it could be updated inside a lambda.

Conclusion

I could refactor this more. There’s duplication in the two lambdas. And I could extract more code into separate methods. Interestingly, the total code base is about the same as before. (It’s actually three lines longer, but I have more than three lines of comments.) But the method with the main logic is shorter and far less complicated:

public static Map<Category, List<Forum>> getBetaTopForums(int userId, List<Topic> primaryBestList, List<Topic> secondaryBestList){
        BestTopicsDAO bestTopicDAO = DataAccessDriver.newBestTopicDAO();
        Map<Integer, Integer> ratings = bestTopicDAO.selectForumRatingsByUserId(userId);
        
        Collection<Integer> forumIds = new LinkedHashSet<>();
        List<Integer> ratedForumIds = CollectionCommon.getKeysSortedByValues(ratings, true);
        final int defaultUserId = SystemGlobals.getIntValue(ConfigKeys.BEST_TOPICS_DEFAULT_USER_ID);
        
        // Let's first add the forums that are rated above 5/10
         addForumIds(forumIds, ratedForumIds, id -> {
            Integer r = ratings.get(id);
            return r != null && r > 5;
        });
        
        // Now let's add the forums found in best topics.
        addForumIds(forumIds, primaryBestList);
        addForumIds(forumIds, secondaryBestList);
          
        // If this user is not the default-rating-user, let's add the top forums
        // rated by the default-rating-user.
        if(userId != defaultUserId) {
            Collection<Integer> fids = getTopRatedForumIds(bestTopicDAO);
            addForumIds(forumIds, fids, id -> id != null);
        }
        
        // Let's also add some forums found in hot topics.
        List<Topic> tmpTopics = TopicRepository.getHottestTopics();
        addForumIds(forumIds, tmpTopics);

        // Finally, let's add the forums that were rated below 6/10
       addForumIds(forumIds, ratedForumIds, id -> {
            Integer r = ratings.get(id);
            return r != null && r < 6;
        });
        
        Map<Category, List<Forum>> forums = new LinkedHashMap<>();
        
        PermissionControl pc = SecurityRepository.get(userId);
        
        final int maxTopForums = SystemGlobals.getIntValue(ConfigKeys.BETA_VIEW_MAX_TOP_FORUMS_IN_HOME);
        AtomicInteger countForumsToShow = new AtomicInteger(0);
        
        // We need to remove the forums that are not visible, or rated 0/10
        forumIds.stream()
             // skip if can't access forum
            .filter(id -> pc.canAccess(SecurityConstants.PERM_FORUM, String.valueOf(id)))
            // get forum
            .map(ForumRepository::getForum)
            // skip if forum not found or can't access
            .filter(f -> f != null && f.isShowInForumList())
            // get category
            .forEach(f -> addForumToCategory(forums, f, maxTopForums, countForumsToShow));
        
        return forums;
        
    }
    
    private static void addForumToCategory(Map<Category, List<Forum>> forums, Forum f, int maxTopForums,
            AtomicInteger countForumsToShow) {
        Category c = ForumRepository.getCategory(f.getCategoryId());
        if (c != null && countForumsToShow.incrementAndGet() <= maxTopForums) {
            List<Forum> fs;
            if (forums.containsKey(c)) {
                fs = forums.get(c);
            } else {
                fs = new ArrayList<>();
                forums.put(c, fs);
            }
            fs.add(f);
        }
    }

    private static void addForumIds(Collection<Integer> forumIds, List<Topic> candidates) {
        if (candidates != null) {
            List<Integer> toAdd = candidates.stream().map(Topic::getForumId).collect(Collectors.toList());
            forumIds.addAll(toAdd);
        }
    }
    
    private static void addForumIds(Collection<Integer> forumIds, Collection<Integer> candidates, Predicate<Integer> predicate) {
        if (candidates != null) {
            List<Integer> toAdd = candidates.stream().filter(predicate).collect(Collectors.toList());
            forumIds.addAll(toAdd);
        }    
    }

java 8 stream performance – maurice naftalin – qcon

This is part of my live blogging from QCon 2015. See my QCon table of contents for other posts.

See http://www.lambdafaq.org

Background
He started with background on streams. (This is old news by now, but still taking some notes). The goals were to bring a functional style to Java and “explicit but unobtrusive” hardware parallelism. The former is more important than performance.

The intention is to replace loops with aggregate operations. [I like that he picked an example that required three operations and not an oversimplified example]. More concise/readable. Easy to change to parllelize.

Reduction == terminal operation == sink

Performance Notes
Free lunch is over. Chips don’t magically get faster over time. Intead, add core. The goal of parallel streamsisfor the intermediate operations in parallel and then bringing them together in reduction.

What to measure?

  • We want to know how code changes affect system performance in prod. Not feasible though because would need to do a controlled eperiment in prod conditions. Instead, we do a controlled experiment in lab conditions and hope not answering a simplified question.
  • Hard to microbenchmark because of inaccuracy, garbage collection, optimization over time, etc. There are benchmarking libraries – Caliper or JMH. [or better if don’t need to microbenchmark]
  • Don’t optimize code if don’t have a problem. What’s your performance requirement? [and is it the bottleneck]. Similarly don’t optimize the OS or the problem lies somewhere else.

Case study
This was a live demo. First we saw that not using BufferedReader makes a file slow to read. [not about streams]. Then we watched my JMeter didn’t work on the first try. [the danger of a live demo]. Then he showed how messing with the GC size and making it too small is bad for performance as well [still not on streams]. He is trying to shw the process of perofrmance tuning overall. Which is valid info. Just not what I expected this session to be about.

Then [after I didn’t see the stream logic being a problem in th first plae], he showe how to solve subproblems and merge them.[oddly not calling it map reduce]

8 minutes before the end of the talk, we finally see the non-parallel code for the case study. It’s interesting code becauase it uses two terminal operations and two streams. At least reading in the file is done normally. Finally, we see that the combiner is O(n) which prevents speeding it up.

Some rules

  • The workload of the intermedidate operations must be great enough to outweith the overheads. Often quoted as size of data set * processing cost per element
  • sorted() is worse
  • Collectors cost extra. toMap*( merging maps is slow. toList, toSet() is dominated by the accumulator.
  • In the real world, the fork/join pool doesn’t operate in isolation

My impressions: A large amount of this presentation wasn’t stream performance. Then the case study shows that reading without a BufferedReader is slow. [no kidding]. I feel like the example was contrived and we “learned” that poorly written code behaves poorly. I was hopingthe talk would actually be about parallelization. When parallelStream() saves time and when it doesn’t for example. What I learned was for this particular scenario, parallelization wasn’t helpful. And then right at the end, the generic rules. Which felt rushed and thown at us.