Skip to content

mtumilowicz/java11-stream-completablefuture-dedicated-executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status

java11-stream-completablefuture-dedicated-executor

Reference: https://www.amazon.com/Modern-Java-Action-functional-programming/dp/1617293563

project description

  1. We could ask shop for a price of a product (by its id)
    class Shop {
        Price getPrice(int id) {
            Delay.delay();
            return Price.of(id);
        }
    }
    
  2. Shop answers with some delay (200 ms)
    class Delay {
        static void delay() {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                // not used
            }
        }
    }
    
  3. we want to ask shop for many ids (for example we have a stream of ids)

solution

  • naive approach - scales badly
    • one product
      var priceFutures = IntStream.range(1, 2)
              .parallel()
              .mapToObj(id -> CompletableFuture.supplyAsync(
                      () -> shop.getPrice(id)))
              .collect(toList());
      
      var prices = priceFutures.stream()
              .parallel()
              .map(CompletableFuture::join)
              .collect(toList());
      
      time: 203 ms
    • four products
      var priceFutures = IntStream.range(1, 4)
              .parallel()
              .mapToObj(id -> CompletableFuture.supplyAsync(
                      () -> shop.getPrice(id)))
              .collect(toList());
      
      var prices = priceFutures.stream()
              .parallel()
              .map(CompletableFuture::join)
              .collect(toList());
      
      time: 203 ms
    • many products
      var priceFutures = IntStream.range(1, 30)
              .parallel()
              .mapToObj(id -> CompletableFuture.supplyAsync(
                      () -> shop.getPrice(id)))
              .collect(toList());
      
      var prices = priceFutures.stream()
              .parallel()
              .map(CompletableFuture::join)
              .collect(toList());
      
      time: 2 s
  • dedicated executor - scales perfectly
    var size = 300;
        
    var executor =
            Executors.newFixedThreadPool(Math.min(size, 100),
                    r -> {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        return t;
                    });
    
    var priceFutures = IntStream.range(1, size)
            .parallel()
            .mapToObj(id -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(id), executor))
            .collect(toList());
    
    var prices = priceFutures.parallelStream()
            .map(CompletableFuture::join)
            .collect(toList());
    
    time: 668 ms

explanation

CompletableFuture and parallel streams internally use the same common pool that by default has a fixed number of threads equal to the one returned by Runtime.getRuntime().availableProcessors().

So we decide to prepare dedicated executor for CompletableFuture tasks. How we estimated the number of threads in a fixed pool? From the given formula:

  • Nthreads = NCPU * UCPU * (1 + W/C)
    • NCPU is the number of cores, available through Runtime.getRuntime().availableProcessors()
    • UCPU is the target CPU utilization (between 0 and 1), and
    • W/C is the ratio of wait time to compute time

About

Implementation of dedicated executor for parallel processing of completable future tasks.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages