프로그래밍/Java

CompletableFuture - 안정적 비동기 프로그래밍에 대해 - (1)

Jay22 2020. 2. 4. 21:42
반응형

CompletableFuture - 안정적 비동기 프로그래밍

모든 코드는 깃허브에 있습니다.

https://github.com/JunHoPark93/completablefuture-practice

 

JunHoPark93/completablefuture-practice

Contribute to JunHoPark93/completablefuture-practice development by creating an account on GitHub.

github.com

이 글은 모던 자바인 액션이라는 책을 보고 정리한 내용입니다.

Future 의 단순 활용 - 기존의 한계

자바 5부터 미래 시점에 결과를 얻을 수 있는 Future 인터페이스를 제공하고 있습니다. 시간이 걸릴 수 있는 작업을 Future 내부로 작성하고 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 할 수 있습니다.

public class FutureMain {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<Double> future = executorService.submit(new Callable<Double>() {
            public Double call() {
                return someLongComputation();
            }
        });
        doSomethingElse();
        try {
            Double result = future.get(1, TimeUnit.SECONDS); // <--- 블록 방지
        } catch (InterruptedException e) {
            // handle e
        } catch (ExecutionException e) {
            // handle e
        } catch (TimeoutException e) {
            // handle e
        }
    }

    private static Double someLongComputation() {
        // do something
        return 1d;
    }

    private static void doSomethingElse() {
        // do something else
    }
}

ExecutorService에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하고 다른 스레드로 또 다른 작업을 수행할 수 있습니다. Future의 get 메서드로 결과를 가져올 수 있는데 가져오는 시점에 완료가 되었으면 성공이지만 완료가 되지 않았다면 결과적으로 블로킹이 일어납니다.

위의 주석에 블록 방지 부분을 보면, 예를들어 1초간 기다리는 설정을 넣어줌으로써 계속 대기하는것을 방지할 수 있습니다. 그래서 스레드가 대기할 최대 타임아웃 시간을 설정해주는것이 좋습니다.

참고로 Future 는 FutureTask를 구현체로 가집니다. (CompletableFuture와 별개 이야기, 물론 이것도 Future를 구현합니다. 한계점으로 지적한 부분들은 CompletableFuture 가 아닙니다.)

한계점 - 복잡한 로직 구현 불가

Future 는 기본적으로 isDone, isCanceled 와 같은 기본사항 체크를 할 수 있는 메서드를 제공합니다. 하지만 이들로는 충분치 않습니다. 예를들어, 각기 다른 실행시간을 가지는 Future들을 조합해서 계산을 한다든지 다른 질의의 결과와 같이 계산을 한다든지 하는 복잡한 (현실세계의 문제를 해결하는데 꼭 필요한) 로직을 다루기 힘듭니다.

CompletableFuture로 비동기 앱 만들기

요 책에서는 간단한 비동기 앱을 만드는 것을 소개합니다. 같이 코딩을 해보며 따라가 볼까요. 앱은 최저가 상점을 찾는 앱입니다. 그래서 다음과 같은 방법을 다룹니다.

  • 두 개의 비동기 동작을 파이프라인으로 만드는 방법
  • 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법
  • 비동기 동작을 완료하는 방법

비동기로 플로우를 구성한다면 계산 동작을 수행하는 동안 비동기로 디스크 접근을 수행해 효율적으로 서버 자원을 이용할 수 있게 됩니다.

비동기 API 구현

가격 계산 동기

최저가 검색 앱을 구현하기 위해 상점에서 정의하는 api 중 하나는 getPrice 즉 가격을 계산하는 로직일 겁니다.

public class Shop {
    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public String getName() {
        return name;
    }
}

getPrice는 상품을 받아 가격을 계산합니다. 그리고 계산할 때 지연동작을 흉내내기 위해 delay()를 호출합니다. delay() 메서드는 내부적으로 1초를 딜레이하게 해놓았습니다. (참고로 가격은 랜덤값을 반환합니다)

public static void delay() {
        int delay = 1000;
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

어찌됐건 사용자가 요청을 하면 1초가 지연이 됩니다. 이런 작업들을 비동기로 바꿔봅시다.

가격 계산 비동기

getPrice 메서드를 getAsyncPrice로 변경합니다.

public Future<Double> getAsyncPrice(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(
            () -> {
                double price = calculatePrice(product);
                futurePrice.complete(price);
            }
        ).start();
        return futurePrice;
    }

내부에서 새로운 스레드를 만들어 계산을 비동기로 수행합니다. 그리고 future에 값을 complete라는 메서드로 할당하고 바로 return 을 합니다.

클라이언트의 사용

public class Main {
    public static void main(String[] args) {
        Shop shop = new Shop("Jay Shop");
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getAsyncPrice("Jay's Mac");

        doSomethingElse();

        try {
            System.out.println("Price is " + futurePrice.get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static void doSomethingElse() {
        // do something else
    }
}

사용자가 저 비동기 메서드를 사용하는 모습입니다. shop의 getAsyncPrice를 호출한 순간 바로 future가 반환되고 다른 작업 doSomethingElse() 를 수행합니다. 그리고 할 일이 없어 졌을 때 future의 get을 호출합니다. 할 일이 없어서 호출을 했는데 아직 future의 작업이 끝나지 않았다면 계산이 끝날 때 까지 블럭 합니다.

비동기 처리 중 에러가 나면 어떡하지?

가격을 계산하는 동안 에러가 나면 어떻게 할까요. 영원히 가격이 나올 때 까지 기다려야 할까요?

초반에 언급드린 것처럼 타임아웃(Timeout)을 설정할 수 있습니다. 하지만 영원히 기다리는 문제를 해결할 수 있어도 원인을 찾지 못합니다. 그래서 CompletableFuture에서 내부적으로 예외를 전달해야 합니다.

public Future<Double> getAsyncPrice(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(
            () -> {
                try {
                    double price = calculatePrice(product);
                    futurePrice.complete(price);
                } catch (Exception e) {
                    futurePrice.completeExceptionally(e);
                }
            }
        ).start();
        return futurePrice;
}

다음과 같이 말이죠.

그렇지만 전체적으로 뭔가 지저분해 보입니다. 어떻게 하면 더 간결하게 쓸 수 있을까요.

코드를 더 간결하게 써보자 - supplyAsync

public Future<Double> getPrice(String price) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(price));
}

supplyAsync는 Supplier를 인수로 받습니다. 그리고 CompletableFuture를 반환합니다.

 

ForkJoinPool의 Executor 중 하나가 이 Supplier를 실행합니다. 바로 위에서 설명했던 에러 처리 방법이 supplyAsync에도 똑같이 관리가 됩니다.

최저가 검색 앱 만들어보기

방법 1 - 블로킹 호출

위에서 소개한 개념들을 이용하여 상품 최저가를 어떻게 가져올지 아주 간단한 코드를 통해서 알아볼까요.

public class PriceFinder {
    private final List<Shop> shops = Arrays.asList(
        new Shop("CoolPang"),
        new Shop("HMarket"),
        new Shop("12th Street"),
        new Shop("YouMakePrice"),
        new Shop("FBay"));

    public List<String> findPrices(String product) {
        return shops.stream()
            .map(shop -> String.format("%s 가격은 %.2f", shop.getName(), shop.getPrice(product)))
            .collect(Collectors.toList());
    }
}

가격 찾기 기능을 수행하는 파인더를 정의합니다. 그 안에는 가게들의 리스트가 있다고 가정합니다. 그리고 가격찾기 메서드를 수행하고 가격을 출력합니다. shop의 getPrice를 하고 있는데 getPrice는 위에서 본 첫 번째 방법인 단순 블로킹 호출입니다.

사용자 쪽에서 확인을 해볼까요?

public class Client {
    public static void main(String[] args) {
        PriceFinder priceFinder = new PriceFinder();
        long start = System.nanoTime();
        System.out.println(priceFinder.findPrices("Mac"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("완료 시간:  " + duration + " msecs");
    }
}

사용자는 Mac 이란 제품을 검색합니다. 내부적으로 5개의 상점이 있는데 각각 상품을 검색하는데 1초(delay()로 준부분입니다!) 가 걸리기 때문에 총 5초가 조금 넘게 걸립니다.

 

방법2 - 병렬 스트림 요청

// 병렬 스트림 블로킹 호출
public List<String> findPrices2(String product) {
    return shops.parallelStream()
        .map(shop -> String.format("%s 가격은 %.2f", shop.getName(), shop.getPrice(product)))
        .collect(Collectors.toList());
}

위 방법과의 차이라면 스트림을 parallel(병렬)로 열었습니다.

속도를 확인해볼까요?

병렬로 실행되어 1초 남짓한 시간에 처리가 되었네요.

 

방법3 - CompletableFuture 비동기 요청

// CompletableFuture 호출
    public List<String> findPrices3(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() ->
                String.format("%s 가격은 %.2f", shop.getName(), shop.getPrice(product))))
            .collect(Collectors.toList());

        return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }

상점의 최저가를 String형 CompletableFuture를 반환하는 메서드를 만들었습니다. (반환형이 String의 리스트가 아닌 CompletableFuture의 리스트입니다!) . 추후에 CompletableFuture의 join 메서드로 반환받은 리스트를 조합해 결과를 냅니다.

CompletableFuture의 join 은 Future의 get과 비슷합니다. 하지만 예외처리가 내부적으로 되어 있다는 차이가 있습니다.

결과적으로 방법2 병렬 스트림과 별 차이 없거나 더 느린결과를 볼 수 있습니다.

 

코드의 첫 문단에서 stream을 열었고 두 번째 return 문단에서도 stream을 열었습니다. 스트림의 게으른 특성 때문에 순차적으로 계산이 되기 때문에 속도가 원하는 만큼 빨라지지 않습니다.

Executor의 설정

위에서 말한 문제를 스레드 병렬 실행으로 개선해볼 수 있습니다.

Exectuor라는 것으로 스레드 풀의 크기를 조절해서 애플리케이션의 성능을 향상시킬 수 있는데요. 상점 개수보다 많은 스레드를 설정하면 오히려 낭비겠죠? 그리고 스레드를 100개 이하로 설정하는 것이 바람직하다고 합니다.

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), (Runnable r) -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
});

다음과 같이 설정해 볼까요. 상점의 크기와 숫자 100중에 최소값을 스레드 풀의 개수로 설정을 합니다. 두 번째 인자를 ThreadPoolFactory라는 인자를 넣어주어야 합니다. 람다식을 쓰지 않고 표현하면 아래와 같습니다.

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
});

데몬 스레드를 true로 주었습니다. 자바에서 일반 스레드가 실행중이면 자바 프로그램은 종료되지 않습니다. 일반 스레드가 한 없이 기다리게 되어 종료되지 않는 일이 벌어지면 문제가 생길 수 있습니다. 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있습니다. 성능은 같으므로 무한히 기다리는 것을 방지하기 위해 데몬 스레드로 설정을 해줍니다.

방법3 (Completable Future)의 성능 향상하기

위에서 알아본 Executor로 CompletableFuture를 최적화 할 수 있습니다.

// CompletableFuture 호출 (Executor 의 사용)
public List<String> findPrices4(String product) {
    List<CompletableFuture<String>> priceFutures = shops.stream()
        .map(shop -> CompletableFuture.supplyAsync(() ->
            String.format("%s 가격은 %.2f", shop.getName(), shop.getPrice(product)), executor))
        .collect(Collectors.toList());

    return priceFutures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
}

CompletableFuture의 supplyAsync 메서드 마지막 인자로 설정한 executor를 넣어봅니다. 그리고 상점을 90여개를 추가를 합니다.

결과는 놀랍게도(?) CompletableFuture의 executor로 스레드풀의 개수를 지정하여 1초만에 끝나게 됩니다.

기존 방법 3의 CompletableFuture의 순차실행을 막고 스레드 별로 할당하여 일을 진행한 결과 1초가 걸리는 작업 90개를 90개의 스레드가 각각 실행을 하여 1초의 결과를 낳습니다. 즉 전체적인 계산이 블록되지 않게 한 것입니다.

 

그렇다고 특정 스레드 개수가 중요한 게 아니라 애플리케이션에 맞는 숫자를 할당하는 것이 중요하겠죠.

비동기 작업 파이프라인

이번엔 위의 프로젝트의 버전 2를 만들어볼까요. 코드 패키지는 version2에 위치해있습니다. 이번에는 찾은 상품의 할인율을 계산해서 내려주어야 합니다. 할인 코드를 구하는 클래스를 먼저 보겠습니다.

public class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    private static double apply(double price, Code code) {
        delay();
        return format(price * (100 - code.percentage) / 100);
    }
}

할인가격을 가져오는 곳도 delay()가 1초 존재합니다.

그리고 이전 가격과 할인된 가격을 같이 파싱해서 보여주는 Quote 클래스입니다.

public class Quote {

    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

블로킹 호출

처음으로 블로킹으로 작업을 호출해보겠습니다.

public List<String> findPrices(String product) {
        return shops.stream()
            .map(shop -> shop.getPrice(product))
            .map(Quote::parse)
            .map(Discount::applyDiscount)
            .collect(Collectors.toList());
}

당연히 10초가 걸리겠죠? 결과를 확인해보겠습니다.

 

동기와 비동기 작업 조합

Quote의 parse는 원격서비스나 디스크 I/O가 없는 작업이므로 지연동작없이 바로 수행할 수 있습니다. 이 부분을 감안해서 동기와 비동기를 조합해볼까요.

public List<String> findPrices2(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() ->
                shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() ->
                Discount.applyDiscount(quote), executor)))
            .collect(Collectors.toList());

        return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

조금은 복잡해 보일 수 있는 코드 입니다. 하나씩 살펴봅시다.

첫 번째 map에서는 상점의 가격을 가져오는 (딜레이 1초) 코드입니다. executor를 주어 모든 작업이 1초에 끝나게 되겠죠. 그것을 (future) thenApply에서 Quote의 parse를 호출합니다. thenApply는 Completablefuture의 동작이 완료가 되어야 적용이 됩니다. 쉽게 말해 synchronous mapping 이라고 생각하면 됩니다.

마지막 map에서는 할인가격을 가져오는 원격 실행 (딜레이 1초)이 필요합니다. 이것을 연쇄적인 비동기로 만들 수 있습니다. thenCompose를 쓰게 된다면 두 번째 CompletableFuture는 첫 번째 CompletableFuture의 결과를 계산의 입력으로 받습니다. 결과적으로 List<CompletableFuture> 을 받습니다. 그리고 마지막으로 CompletableFuture의 Join을 호출하여 결과를 모아줍니다. 2초가 걸리네요.

 

독립적인 비동기 끼리의 조합

위의 예에서는 첫번째 결과가 두 번째 비동기 요청의 인자로 사용되었습니다. 하지만 두 개의 플로우가 관련이 없다면 어떨까요? 이럴땐 thenCombine을 사용합니다. thenCombine도 있고 thenCombineAsync도 존재합니다.

예를들어, 가격을 가져오는 것과 환율을 계산하는 것은 서로 관계가 없습니다. 단지 두 개의 결과를 곱해서 환율을 반영한 가격이 반환이 되는 것이죠.

 

다음에는 이 비동기 메서드들에 대해서 자세히 알아보겠습니다.

반응형