AWS의 메세지 브로커 서비스 - Amazon MQ(Active MQ)를 사용해 보자.



  • 메세지 브로커 전용툴을 탑재한 Amazon MQ 서비스가 불과 몇달전 드디어 서울 리전에 공식적으로 서비스가 시작되었다.

  • 그 전까지는 Amazon에서 메세지 기반의 프로비저닝 서비스는 Amazon SQS, SNS 두가지가 전부였다. 물론 Elastic Beanstalk의 Worker Environments(작업자 환경) 서비스도 있기는 하나, 이 서비스도 SQS 기반으로 메세지를 처리하는 방식이라 사실 저 두 가지가 전부였다고 볼 수 있다.

  • 이제는 메세지 브로커 솔루션인 ActiveMQ 를 Amazon의 플랫폼 기반에서 설치 및 설정작업등의 자동화를 통해 바로 사용할 수 있게 되었다. 이중화는 덤~. 비용은… 편함의 정도와 비례하는 것이라...

  • 아마 지원하는 메세지 브로커 제품군은 더 확대될 것으로 보인다.

  • 개인적으로 궁금한 부분은 Amazon이 왜 메세지 브로커 제품 중 가장 사용자층이 많고 성능면에서도 인정받는, 그래서 가장 레퍼런스가 많은 RabbitMQ나 Apache Kafka를 먼저 선택하지 않고 상대적으로 인지도가 저 두개 제품보다는 낮은 ActiveMQ를 먼저 지원했는지이다. 참으로 궁금하고 이해가 잘 되지 않는 부분이다.

  • SpringBoot기반에서 계속 서비스를 개발해야 하는 개발자 입장에서 보면 AmazonMQ가 Spring Cloud Stream 영역에서 추상화된 API를 바로 사용할 수 있는 RabbitMQ나 Apache Kafka 둘 중에 하나를 지원했었더라면 하는 아쉬움이 있다.


메세지 브로커의 선택

현재 레거시 시스템간 인터페이스를 메세지 기반으로 전환하려는 계획을 가지고 있고, 메세지 브로커를 도입하기 위해 여러 솔루션을 리서치 중인데 팀원들의 의견과 경험치, 레거시 시스템의 현재상황을 고려한 나름의 선정기준을 만들었다.

이전 블로그에 언급했는데, 후보군은 RabbitMQ, Apache Kafka, Amazon SQS, Amazon SNS 이었다가 얼마전 서울리전에 서비스가 시작되면서 AmazonMQ(Apache MQ)가 추가되었다.


선정기준

  1. 메세지 브로커에 대한 설치 및 설정에 대해 최소한의 개발비용

  2. 모니터링 및 이중화(클러스터링)의 지원여부, 이 또한 최소한의 개발비용

  3. 이중화 및 FailOver 처리는 필수

  4. 성능보다는 안정성, 라우팅 기능보다는 메세지 보관, 전송 보장등이 우선

  5. Spring Cloud Stream의 추상화 자원 사용가능

결론적으로,

최종 선택된 제품은 AmazonMQ이다.


서울리전에 서비스 런칭전까지는 5번의 이유로 RabbitMQ나 Kafka 중 하나로 결정할 예정이었는데, AmazonMQ가 나오면서 1, 2, 3번의 기준을 모두 만족시키게 되었고, 5번의 개발 편의성보다 훨씬 중요도가 높다고 판단, 최종적으로 AmazonMQ기반으로 선택하기로 결정하였다.

아래에서 설명하겠지만, Spring Boot기반으로도 JMS기반의 어느정도 추상화된 API와 어노테이션을 사용하여 ActiveMQ와의 연동이 가능하기 때문에 개발 편의성이나 생산성 측면에서는 크게 차이가 나지는 않을것 같다.

물론, 아직 Spring Cloud Stream 기반으로 개발하지 못하는 부분이 아쉬운 건 사실이다. 현재 Alpha버전으로 Spring Cloud Stream기반으로 사용하기 위한 프로젝트는 진행 중인 걸로 안다. 관련해서는 아래의 깃헙 소스를 참조해 보면 좋을 듯 하다.

https://github.com/spring-cloud/spring-cloud-stream-binder-jms


SpringBoot 기반 Amazon MQ 연동하기

JMS기반으로 연동해야 하는 것 말고는 Spring Cloud Stream 라이브러리 기반의 사용법과 동일하다.

ActiveMQ에 의존적이어야 하는 설정은 접속정보와 채널정의 외는 없다고 보면 되겠다.

  • 아직 Srping Cloud Stream 기반의 추상화 API는 RabbitMQ 와 Kafka와 연동시에만 지원한다.

  • 즉, ActiveMQ와 연동시 채널 바인딩을 위한 추상화 클래스인 Sink Source 인터페이스는 사용할 수 없다.

    • Sink - 메세지 브로커와 소비자 시스템간 논리적인 인풋 바인딩 채널을 추상화한 인터페이스 (메세지 수신을 위한 논리적인 채널 바인딩 관리)

    • Source - 메세지 브로커와 생산자 시스템간 논리적인 아웃풋 바인딩 채널을 추상화한 인터페이스 (메세지 전송을 위한 논리적인 채널 바인딩 관리)

    • @EnableBinding 어노테이션과 같이 사용하여 메세지 브로커 채널 바인딩 및 메세지 전송 및 수신 처리를 담당

  • JMS 기반 API 및 어노테이션을 사용해야

    • javax.jms.Destination인터페이스를 구현하여 Topic 또는 Queue 채널로 정의하고 JmsTemplate 클래스 빈을 사용하여 정의한 해당 객체를 통해 메세지를 전송한다.

    • @JmsListener 어노테이션을 통해 메세지를 수신한다.

    • 메세지 발행 및 수신 클래스에는 기본적으로 @EnableJMS 어노테이션을 사용하여 SpringBoot에 해당 클래스가 JMS 연동을 하기 위한 클래스임을 알려준다.


1. 의존성 라이브러리 추가

  • pom.xml 파일 - SpringBoot 2.0.4 기준

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.yorath</groupId>
<artifactId>springboot-activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>springboot-activemq</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

아직 ActiveMQ에 대해서 Spring Cloud Stream기반의 추상화 API 및 어노테이션을 사용할 수는 없지만, 스프링부트 기반에서 ActiveMQ 전용 Starter Kit을 지원하고 있기 때문에 아래의 spring-boot-starter-activemq 라이브러리 하나만 추가하면 된다.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

현재 AmazonMQ가 지원하는 메세지브로커가 ActiveMQ 하나 밖에는 없지만, 점점 지원하는 제품군이 늘어날 것으로 예상된다. 곧 RabbitMQ나 카프카도 지원하지 않을까? 기대한다.


  1. 메세지 브로커 (Active MQ) 접속정보 및 속성 정의

spring.activemq.broker-url=failover:(ssl://***********,ssl://***********)
spring.activemq.user=user
spring.activemq.password=password

### Set For Topic ( Publish to Subscribe ), If use Queue -> false
spring.jms.pub-sub-domain=true

failover: url1, url2 방식으로 broker-url을 정의하게 되면 active 서버에서 재부팅 또는 장애가 발생할 경우, standby(url2)로 자동 failover가 이루어진다. Active에 전송된 메세지도 자동으로 StandBy로 넘어간다.


ActiveMQ의 경우 채널의 성격을 크게 두가지로 구분해서 지원한다.

  • Queue 방식 (point to point)과 Topic 방식( Publish and Subscribe) 으로 구분

우선 우리는 Topic 방식의 메세지 연동이 필요하기 때문에 Topic기반의 Pub/Sub방식의 메세지 라우팅에 필요한 설정을 해야 한다.

spring.jms.pub-sub-domain=true

이렇게 선언을 해야 Topic방식으로 1:N 방식으로 메세지 라우팅이 가능해진다. 메세지 전송처리 로직에서도 물론 지켜야 할 원칙이 있는데, 바로 아래에서 설명하겠다.


3. Topic 채널 정의 - Pub/Sub 방식의 메세지 라우팅

당장 레거시 시스템에서 발행자의 메세지가 여러 수신자 시스템으로 라우팅 되어야 하는 니즈를 충족시켜야 한다. 이런 경우 ActiveMQ에서는 기본적으로 Queue(Point to Point)방식으로 설정되어 있는데 이것을 Topic으로 변경시켜야 한다.

  • 원칙 1: 위에서 설명함. 프로퍼티 설정을 이렇게 명시해야 함. spring.jms.pub-sub-domain=true

  • 원칙 2: JmsTemplate를 메세지 전송시 반드시 AcitveMQTopic 객체의 인스턴스를 Destination인자로 사용해야 한다. 원칙1을 설정했다 하더라도 원칙2를 지키지 않으면 Queue방식으로 전송됨.


ActiveMQTopic 채널 정의 및 빈 등록

@Bean(name = "productTopic")
public ActiveMQTopic productTopic() {
  return new ActiveMQTopic ("product.topic" );
}

발행자 클래스에서 위에서 선언한 ActiveMQTopic 빈 주입 및 메세지 전송메서드의 인자로 설정.

@Component
public class Producer {

   @Autowired
   private JmsTemplate jmsTemplate;

   @Autowired
   private ActiveMQTopic productTopic;

   jmsTemplate.convertAndSend ( productTopic, product, (Message message) -> {

       message.setJMSDeliveryMode ( DeliveryMode.NON_PERSISTENT );
       message.setJMSCorrelationID ( UUID.randomUUID ().toString () );
       message.setJMSPriority ( 10 );

       return message;
    });
   
}

JmsTemplate의 메세지 전송 메서드인 convertAndSend의 첫번째 인자는 메세지의 목적지(Destination)를 지정하는 인자로 String형태의 문자가 올 수도 있고, Destination유형의 객체가 올 수도 있다.

Pub/Sub방식의 Topic기반 메세지 연동을 위해서는 첫번째 인자를 일반 문자열로 지정하면 안된다. 반드시 위에서 설명한 대로 Destination 인터페이스를 구현한 ActiveMQTopic 클래스의 인스턴스를 인자로 사용해야 한다.


4. 구독자 클래스 정의 (Consumer Class)


@Component
@EnableJms
public class Consumer {

   @JmsListener(destination = "product.topic", containerFactory = "jsaFactory")
   public void appleReceive(@Payload Product product,
                            @Headers Map<String, Object> headers,
                            MessageHeaders messageHeaders,
                            JmsMessageHeaderAccessor messageHeaderAccessor) {


       System.out.println ("====== Receive Product 1 ===== ");
       System.out.println (" [Product] " + product.toString () );
       System.out.println (" [UserName]" + String.valueOf(headers.get("UserName")));
       System.out.println (" [Is Jms Custom Property]" + messageHeaders.get("jms-custom-property"));

       System.out.println (" [ContentType]:" + messageHeaderAccessor.getContentType ());
       System.out.println (" [CorrelationId]:" + messageHeaderAccessor.getCorrelationId ());
       System.out.println (" [DeliveryMode]:" + messageHeaderAccessor.getDeliveryMode ());
       System.out.println (" [Destination]:" + messageHeaderAccessor.getDestination ());
       System.out.println (" [Priority]" + messageHeaderAccessor.getPriority () );
  }
}

핵심은 메세지를 수신받는 메서드에 @JmsListener 어노테이션을 추가하는 것이다. 첫번째 인자인 메세지 객체 인자에는 @Payload 어노테이션을 지정하면 된다. 제대로 테스트 하려면 발행자 어플리케이션과 별도로 어플리케이션을 추가로 구성해서 테스트 해 보는것이 좋다.

메세지 전송시 정의한 별도의 속성값이나 기본적인 속성값들은 위의 인자값 유형을 참조해서 꺼내올 수 있다. 메세지 자체에 대한 별도의 커스커마이징 속성값이 필요한 경우 적극적으로 활용하면 좋을 것 같다.

사실, ActiveMQ에서 메세지 연동을 위한 핵심 객체(빈)들의 종류와 설정에 대한 설명이 빠졌는데

  • 메세지 전송을 담당하는 JmsTemplate 객체

  • 메세지 수신을 담당하는 JmsListenerContainerFactory 객체

  • 메세지 변환을 담당하는 MessageConverter 객체

이 객체들은 이미 주요 설정값들이 SpringBoot의 스타터킷에서 기본적으로 설정이 되어 있다. @AutoConfiguration의 힘(?) 이다.


다음에는 위의 객체들의 속성을 별도로 설정(Override)하기 위해 @Configuration으로 생성한 ActiveMQ 설정 클래스에 대해서 설명하는 시간을 가지도록 하겠다.


2020.11.13(금) 추가내용

감사하게도 2년전 이 글이 도움이 되었다는 분이 문의하신게 있어 ActiveMq 커넥션 설정파일 소스를 추가합니다.




  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기