서비스를 구현하다 보면,

성능 측면에서의 효과를 극대화하기 위해 

메서드 레벨에서 명시적인 방식으로 멀티쓰레드를  구현해야 할 필요가 있습니다.

 

특히, 외부서비스를 반복적으로 호출해야 할 경우,
실행 순서에는 상관없이 여러 번 동일한 메서드를 호출해서 결과값을 Aggegation을 해야 할 필요가 있다면,
Single Thread 방식과 Multi Thread에  따라 그 효과는 더 극명하게 드러납니다..

 

이런 멀티쓰레드 기반의 실행(Execute) 과 종료(Shutdown)등의 관리(Management)를 편하게 할 수 있도록

도와주는 라이브러리를 사용해서 좀 더 쉽고 직관적으로 멀티쓰레드 기반의 로직을 구현 할 수 있습니다.

 

ExecutorService, Executors

Executors

Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, Callable 등, 멀티 쓰레드 기반으로 실행되는 인터페이스들의 팩토리 클래스입니다.


특히, ThreadPoolExecutor 생성자를 통해서 여러 옵션의 쓰레드풀기반 ExecutorService의 구현체를 제공해 주기 때문에
멀티쓰레드 구현을 위해서는 대부분 아래의 방식으로 ExecutorService 구현체를 생성해서

여러 타스크들을 동시에 실행시키게 됩니다.

ExecutorService executorService = Executors.newCachedThreadPool();

 

 

 

주요 ThreadPool 종류

  • newFixedThreadPool(int nThreads)
    • 지정한 수만큼의 고정된 쓰레드풀을 생성한다.
  • newCachedThreadPool()
    • 호출 당시의 필요한 만큼의 쓰레드풀을 생성한다. Cached라는 이름이 붙은대로 이미 생성된 쓰레드는 Reuse하기 때문에 성능상의 이점을 좀 더 기대할 수 있다.
  • newScheduledThreadPool(int corePoolSize)
    • 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 쓰레드풀을 corePoolSize 인자 개수만큼 생성한다.
    • 아이들상태이더라도 corePoolSize만큼의 풀사이즈를 계속 유지한다. 
  • newWorkStealingPool(int parallelism)
    • JDK1.8부터 지원하는 방식
    • parallelism - 병렬처리 레벨을 의미
    • 병렬처리 레벨을 지정하지 않으면 현재 시스템의 core 프로세스 개수를 기반으로 동적으로 Pool 사이즈가 할당된다.
    • 즉, 시스템에 가용 가능한 만큼 쓰레드를 활용하는 ExecutorService를 생성한다.

 

ExecutorService

Runnable 이나 Callable 인터페이스를 구현한 Task( = Thread )의 실행과 종료를 수행하는 인터페이스 입니다.

 

여기서 설명하는  예제는 Callable 를 구현한 클래스를 수행하고 그 결과를  Collections 형태의 결과로 반환하는 ExecutorService의 invokeAll() API를 사용했습니다.

 

ExecutorService 는 Executors 클래스가 제공하는 ThreadPool을 통해서 구현이 가능합니다.

ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<Map<Integer, String>>> resultList = executorService.invokeAll(roomStatusJobs);

 

 

 

지금부터는 간단한 예제를 통해서

ExecutorService 를 사용했을 때와 사용하지 않았을 때의 차이를 살펴보겠습니다.

 

외부 서비스 - 룸 상태 조회 서비스

private String getRoomStatus(int roomNo) {
  try {
    logger.debug("Called Room Status, RoomNo = {}", roomNo);
    Thread.sleep(3000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return roomNo + ": empty";
}

룸 번호를 받아 현재 룸의 상태를 반환하는 메서드입니다.

아주 간단한 메서드이지만 실제 멀티쓰레드로 호출될 외부서비스를 의미한다고 보면 됩니다.

 

차이를 확연하게 나타내기 위해서 메서드 실행 시 강제로 지연시간을 주었습니다. (3초)

즉, 이 외부 서비스는 한 번 호출 시 3초의 수행 시간이 소요됩니다.


멀티쓰레드 테스트가 목적이므로 상태 값은 무조건 ‘empty’로 고정해서 반환되도록 설정했습니다.

 

 

전체 룸의 상태 조회 서비스 - Single Thread 기반 

 

@Test
public void roomStatusTestBySingleThread() {
  // 전체 Room 상태리스트
  List<Map<Integer, String>> roomStatusList = new ArrayList<>();

  int roomCount = 10;
  for (int roomNo = 1; roomNo <= roomCount; roomNo++) {
    // Room 상태 조회(외부서비스) - Room 하나의 상태를 조회하는데 3초가 걸린다고 가정
    String roomStatus = this.getRoomStatus(roomNo); //
    logger.debug("roomNo:{}, roomStatus:{}", roomNo, roomStatus);

    Map<Integer, String> statusMap = new HashMap<>();
    statusMap.put(roomNo, roomStatus);
    roomStatusList.add(statusMap);
  }
}

룸의 개수를 10개로 설정했고, 전체 방의 개수만큼 루프를 돌면서
getRooStatus 서비스를 호출하고 , 각 룸의 상태를 가져와
그 결과값을 Map( 룸 번호, 룸 상태)으로 담아 List에 추가하는 로직입니다.

 

이 메서드를 실행하게 되면,
각 룸의 상태를 순차적으로 호출하고, 결과가 반환되면
다음 룸의 상태를 조회하는 싱글 쓰레드 방식으로 프로세스가 수행됩니다.

 

 

위의 경우 룸의 개수는 10개이고,

각 룸의 상태를 얻어오는 외부 서비스 - getRoomStatus()- 의 수행 시간은 3초 입니다. 

 

룸의 개수만큼 단일 쓰레드가 반복 호출되었기 때문에,

모든 룸의 상태를 얻어 오기 위해  30초의 시간이 걸리게 됩니다.

 

하지만 동일 메서드의 반복 호출(호출 순서가 의미 없는)을  이런 방식으로 구현하게 되면,

  • 이전 서비스의 수행이 끝나야지 다음 서비스가 호출되는, 불필요한 의존성을 띄게 되고
  • 결구 룸의 개수(호출 대상 건수)가 늘어나는 만큼 수행 속도는 그에 비례해서 늘어나게 됩니다. 

사실 이 서비스가 필요한 것은

지금처럼 순차적으로 서비스가 실행될 필요 없이
최대한 빠른 시간 내에 모든 룸의 상태(외부 서비스)를 조회해서 그 결과를 Merge하고 Aggregation 하는 게 목적입니다.

 

이런 경우에는 지금 소개하는 ExecutorService를 사용하게 되면,
호출을 별도의 쓰레드로 병렬 처리로 수행함으로써 서비스 전체의 수행 속도와 불필요한 의존도를 제거할 수 있습니다.

 

@Test
public void roomStatusTestByMultiThread() throws Exception {
  int roomCount = 10;

  List<Callable<Map<Integer, String>>> roomStatusJobs = new ArrayList<>();
  for (int roomNo = 1; roomNo <= roomCount; roomNo++) {
    roomStatusJobs.add(new RoomStatusTask(roomNo));
  }

  ExecutorService executorService = Executors.newFixedThreadPool(roomCount);
  List<Future<Map<Integer, String>>> resultList = executorService.invokeAll(roomStatusJobs);

  for (Future<Map<Integer, String>> futureMap : resultList) {
    futureMap.get().entrySet().forEach(entry -> {
      logger.debug("roomNo:{}, roomStatus:{}", entry.getKey(), entry.getValue());
    });
  }
}


/*
   Callable을 구현한 외부서비스 실행 타스크 클래스.
   ExecutorService의 invokeAll로 수행되는 멀티쓰레드 대상클래스
*/
private class RoomStatusTask implements Callable<Map<Integer, String>> {
  private int roomNo;
  public RoomStatusTask(final int roomNo) {
    this.roomNo = roomNo;
  }
  @Override
  public Map<Integer, String> call() {

    Map<Integer, String> statusMap = new HashMap<>();
    // 서비스 호출
    String roomStatus = getRoomStatus(this.roomNo);

    statusMap.put(this.roomNo, roomStatus);
    return statusMap;
  }
}

 

 

  List<Callable<Map<Integer, String>>> roomStatusJobs = new ArrayList<>();
  for (int roomNo = 1; roomNo <= roomCount; roomNo++) {
    roomStatusJobs.add(new RoomStatusTask(roomNo));
  }

룸의 개수만큼 타스크 클래스(Callable 구현체)의 인스턴스를 생성하고 Job List에 담습니다.
이렇게 담은 JobList인 rooStatusJobs는 ExecutorService의 invokeAll API를 통해 실행되게 됩니다.

 

 

  ExecutorService executorService = Executors.newCachedThreadPool();
  List<Future<Map<Integer, String>>> resultList = executorService.invokeAll(roomStatusJobs);

newCachedThreadPool 방식으로 쓰레드풀을 생성하고,
invokeAll을 통해 멀티쓰레드 기반비동기 방식으로 전체 룸 상태 조회(roomStatusJobs) Job들을 실행시킵니다.

 

비동기 방식이기 때문에,

invokeAll의 실행결과는 Future 객체로 받아오게 됩니다.

 

 

private class RoomStatusTask implements Callable<Map<Integer, String>> {
  private int roomNo;
  public RoomStatusTask(final int roomNo) {
    this.roomNo = roomNo;
  }
  @Override
  public Map<Integer, String> call() {

    Map<Integer, String> statusMap = new HashMap<>();
    // 서비스 호출
    String roomStatus = getRoomStatus(this.roomNo);

    statusMap.put(this.roomNo, roomStatus);
    return statusMap;
  }
}

Callable를 구현한 클래스로 실제 외부 서비스(룸 상태 조회)를 호출하는

핵심 비즈니스 로직을 담고 있는 클래스 있습니다.

 

생성자 메서드를 통해 룸 번호를 설정하고

call() 메서드를 통해 해당 룸 번호의 상태를 외부 서비스를 통해 호출 -> 결과값을 받아 처리합니다.

 

이렇게 변경한 후 테스트를 해 보면

싱글쓰레드 기반의 테스트 결과와는 확연이 차이가 나는 결과를 볼 수 있습니다

룸의 개수만큼 10개의 쓰레드가 생성되고,

각 쓰레드가 동시에 단 한 번의 외부 서비스(getRoomStatus)만 호출된 걸 확인할 수 있습니다.

 

호출되는 서비스 입장에서는 다른 요청(다른 쓰레드의 동시 요청)에 대해 한 번만 호출되었기 때문에
서비스 전체의 수행 시간이 1번 수행에 소요되는 시간인 3초밖에 걸리지 않았습니다.


단일쓰레드 방식과 다른 부분을  정리하지면

  • 멀티쓰레드로 수행될 Task클래스(Callabe 인터페이스를 구현한 클래스)의 인스턴스를 필요한 개수만큼 생성하는 부분
  • ExecutorService를 이용해서 생성된 Task 인스턴스들을 동시에 실행 (invokeAll) - ThreadPool의 가용치만큼 동시 수행
  • 호출된 결과를 Future와 받아와 Aggregation하는 부분

이 정도로  정리할 수 있을 것 같습니다.

 

 

 

 

이렇게, 수행 순서와 상관없이 동일 서비스를 여러 번 반복 호출해서

결과를 Merge하거나 Aggregation을 해야 할 로직에서는
ExecutorService를 잘 활용하면 쉽고 직관적인 방식으로 멀티쓰레드 기반의 병렬 처리 프로세스를
구현할 수 있으며,

수행 시간도 경우에 따라서는 획기적으로 줄일 수 있습니다.

 

 

해당 라이브러리에 대해서 좀 더 Deep 하게 파악하고 싶다면,

관련 API Document를 참고하시면 됩니다.

 

ExecutorService — https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

 

ExecutorService (Java Platform SE 8 )

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks. An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are p

docs.oracle.com

Executors — https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html

 

Executors (Java Platform SE 7 )

Returns a thread factory used to create new threads that have the same permissions as the current thread. This factory creates threads with the same settings as defaultThreadFactory(), additionally setting the AccessControlContext and contextClassLoader of

docs.oracle.com

 

  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기