본문 바로가기

OpenSource/RabbitMQ

Spring AMQP를 사용하여 RabbitMQ Producer, Consumer 예제 코드 작성하기

728x90
반응형

spring-amqp dependency 추가

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

 

application.yml 설정

spring:
  rabbitmq:
    host: localhost #hostIp
    port: 5672 #default 5672
    username: "admin" #username
    password: "admin" #password

 

Queue 생성 및 설정

  • spring boot 환경에서는 Configuration 설정을 통해 쉽게 Queue, Binding, Exchange를 설정할 수 있습니다.
  • DirectExchange를 통해 routingKey와 일치하는 Queue로 메세지를 발행합니다.
@Configuration
@Profile("rabbitmq")
@DependsOn("globalProperties")
@EnableRabbit
public class RabbitMqActionQueueConfig {

    // 지정된 이름으로 queue 등록
    @Value("${queue.name:}")
    private String queueName;

    @Bean
    Queue queue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-expires",60000L); // queue 만료 시간 (메시지에 액세스하지 않은 경우 일정 시간이 지나면 큐 자체가 만료)
        return new Queue(queueName, true, false, true, args);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("direct-exchange");
    }

    // 빈으로 등록한 Queue와 Exchange를 바인딩하면서 Exchange에서 사용될 패턴을 설정
    @Bean
    Binding binding(DirectExchange exchange, Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    // Spring Boot에서 자동으로 빈 등록을 해주지만 받은 메세지 처리를 위한 messageConverter을 설정하기 위해 오버라이딩
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

	  // 메세지를 전송하고 수신하기 위한 JSON 타입으로 메세지 변경
    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

 

Queue Listener 등록 및 IdleEvent 설정

Queue Listener를 등록하는 방법은 2가지가 있습니다.

  1. @RabbitListener 어노테이션을 사용
  2. SimpleMessageListenerContainer를 사용

아래 예제에서는 SimpleMessageListenerContainer를 통해 Listener를 등록해보겠습니다.

  • SimpleMessageListenerContainer를 생성 후 ConnectionFactory를 설정하여 메시지 브로커와의 연결을 관리합니다.
  • setQueueNames를 통해 리스너가 메세지를 수신할 queue 이름을 설정합니다.
  • IdleEventInterval을 통해 Queue가 특정 시간(1분)간 사용되지 않은 경우 IdleEvent 발생합니다.
  • idleEvent 3번 이상 발생 시 Listener 해제 및 Queue 삭제합니다.
@Configuration
@Profile("rabbitmq")
@DependsOn("globalProperties")
@RequiredArgsConstructor
public class RabbitMqListenerConfig {

    @Value("${cmp-agent.uuid:}")
    private String queueName;

    private final RabbitMqRequestMessageListener listener;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
        simpleMessageListenerContainer.setQueueNames(queueName);
        simpleMessageListenerContainer.setIdleEventInterval(60000L);
        simpleMessageListenerContainer.setMessageListener(listener);
        return simpleMessageListenerContainer;
    }
}
@Component
@Profile("rabbitmq")
@RequiredArgsConstructor
public class RabbitMqRequestMessageListener implements ChannelAwareMessageListener {

    private final RabbitTemplate rabbitTemplate;
    private final ActionScheduler actionScheduler;

    private int idleCount = 0;
    private static final int MAX_IDLE_COUNT = 2;

    @EventListener
    public void handleIdleEvent(ListenerContainerIdleEvent event) {
        idleCount++;
        if (idleCount >= MAX_IDLE_COUNT) {
            ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
            connectionFactory.clearConnectionListeners();

            SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer) event.getSource();
            simpleMessageListenerContainer.stop();
            idleCount = 0;
        }
    }

    // 생략
}

 

Producer Code

RabbitMQ 서버에 메세지를 전달하는 Producer 코드 예제입니다.

  • RabbitTemplate을 통해 Queue에 QueueMessage(JSON) 데이터를 전달합니다.
    • QueueMessage에는 해당 액션을 요청한 user 정보ACTION_CODE, receiver 정보 등을 전달합니다.
  • QueueName과 동일한 이름의 Queue 존재 여부 확인 및 Queue 생성 / 리스너 등록
@Service
@RequiredArgsConstructor
public class RabbitMqServiceImpl implements RabbitMqService {

    private final ActionQueueSender actionQueueSender;
    private final RabbitAdmin rabbitAdmin;
    private final ConnectionFactory connectionFactory;
    private final ApplicationContext applicationContext;
    private final GlobalProperties globalProperties;
    private final UserMapper userMapper;

    @Override
    public void sendAction(Long userSeq, ActionCode actionCode) {

        if(!globalProperties.isRabbitmqUsage())
            return;

        RabbitMqActionRequestInfo rabbitMqActionRequestInfo = new RabbitMqActionRequestInfo(targetSeq);

        QueueMessage queueMessage = QueueMessage.builder()
                .actionCode(actionCode)
                .dto(rabbitMqActionRequestInfo)
                .build();

        String queueName = userMapper.getDetail(userSeq).getUuid();

				// queueName과 일치하는 queue가 없을 경우 Queue 생성 및 Listener 설정
        if(rabbitAdmin.getQueueProperties(queueName) == null) {
            Queue queue = new Queue(queueName);
            rabbitAdmin.declareQueue(queue);

            RabbitMqRequestMessageListener listener = (RabbitMqRequestMessageListener) applicationContext.getBean("rabbitMqRequestMessageListener");
            SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();

            listenerContainer.setConnectionFactory(connectionFactory);
            listenerContainer.setQueueNames(queueName);
            listenerContainer.setIdleEventInterval(60000L);
            listenerContainer.setMessageListener(listener);
            listenerContainer.start();
        }

				// 메세지 전송
        actionQueueSender.sendAsync(queueName, queueMessage);
    }
}

@Service
@Configuration
public class ActionQueueSender {

    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper;

    public void sendAsync(String queuename, QueueMessage queueMessage){
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        MessageProperties messageProperties = new MessageProperties();
        Message message = messageConverter.toMessage(queueMessage, messageProperties);
        this.rabbitTemplate.convertAndSend(queuename, message);
    }

    public ActionQueueSender(final RabbitTemplate rabbitTemplate, final ObjectMapper objectMapper) {
        this.rabbitTemplate = rabbitTemplate;
        this.objectMapper = objectMapper;
    }
}

 

QueueMessage 클래스는 제네릭 타입을 지원하도록 함으로써 메세지에 데이터 전송 객체(DTO)를 포함 할 수 있도록 구현했습니다.

사용자 id, 액션 코드 그리고 데이터 전송 객체를 포함하여 전달 할 수 있습니다.

public class QueueMessage <T> {

    private String userId;
    private T dto;
    private ActionCode actionCode;

    public QueueMessage() {

    }

    public static QueueMessage.QueueMessageBuilder builder() {
        return new QueueMessage.QueueMessageBuilder();
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(final String userId) {
        this.userId = userId;
    }

    public Object getDto() {
        return dto;
    }

    public void setDto(final T dto) {
        this.dto = dto;
    }

    public ActionCode getActionCode() {
        return actionCode;
    }

    public void setActionCode(final ActionCode actionCode) {
        this.actionCode = actionCode;
    }

    public QueueMessage(final String userId,
                       final T dto,
                       final ActionCode actionCode) {
        this.userId = userId;
        this.dto = dto;
        this.actionCode = actionCode;
    }

    public static class QueueMessageBuilder<T> {

        private String userId;
        private T dto;
        private ActionCode actionCode;

        QueueMessageBuilder() {

        }

        public QueueMessage.QueueMessageBuilder userId(final String userId) {
            this.userId = userId;
            return this;
        }

        public QueueMessage.QueueMessageBuilder dto(final T dto) {
            this.dto = dto;
            return this;
        }

        public QueueMessage.QueueMessageBuilder actionCode(final ActionCode actionCode) {
            this.actionCode = actionCode;
            return this;
        }

        public QueueMessagebuild() {
            return new QueueMessage(this.userId, this.dto, this.actionCode);
        }

        public String toString() {
            return "QueueMessage.QueueMessageBuilder(userId=" + this.userId + ", object=" + this.dto + ", actionCode=" + this.actionCode;
        }
    }
}

 

Consumer Code

RabbitMQ 서버로부터 메세지를 전달 받는 Consumer 코드 예제입니다.

  • 브로커로부터 메시지를 수신하기 위해 ChannelAwareListener를 사용 해야 합니다. ChannelAwareListener는 Spring AMQP 라이브러리에서 제공하는 인터페이스로, RabbitMQ의 채널(Channel)을 명시적으로 관리할 수 있습니다. Channel 객체를 직접 다루면, 메세지 확인, 재처리, 거부 등 다양한 기능을 사용할 수 있습니다.
  • RabbitMqListenerConfig 에서 설정한 Queue에 Message가 들어오면 ChannelAwareListener의 onMessage가 수행됩니다.
  • QueueMessage을 통해 전달 받은 ACTION_CODE에 따라 액션을 수행합니다.
@Component
@Profile("rabbitmq")
@RequiredArgsConstructor
public class RabbitMqRequestMessageListener implements ChannelAwareMessageListener {

    private final RabbitTemplate rabbitTemplate;
    private final ActionScheduler actionScheduler;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        QueueMessage queueMessage = (QueueMessage) messageConverter.fromMessage(message);
        actionScheduler.schedule(queueMessage);
    }
}

728x90
반응형