728x90
반응형
spring-amqp dependency 추가
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
application.yml 설정
spring:
rabbitmq:
host: localhost #hostIp
port: 5672 #default 5672
username: "admin" #username
password: "admin" #password
Queue 생성 및 설정
- spring boot 환경에서는 Configuration 설정을 통해 쉽게 Queue, Binding, Exchange를 설정할 수 있습니다.
- DirectExchange를 통해 routingKey와 일치하는 Queue로 메세지를 발행합니다.
@Configuration
@Profile("rabbitmq")
@DependsOn("globalProperties")
@EnableRabbit
public class RabbitMqActionQueueConfig {
// 지정된 이름으로 queue 등록
@Value("${queue.name:}")
private String queueName;
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
args.put("x-expires",60000L); // queue 만료 시간 (메시지에 액세스하지 않은 경우 일정 시간이 지나면 큐 자체가 만료)
return new Queue(queueName, true, false, true, args);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("direct-exchange");
}
// 빈으로 등록한 Queue와 Exchange를 바인딩하면서 Exchange에서 사용될 패턴을 설정
@Bean
Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
// Spring Boot에서 자동으로 빈 등록을 해주지만 받은 메세지 처리를 위한 messageConverter을 설정하기 위해 오버라이딩
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
// 메세지를 전송하고 수신하기 위한 JSON 타입으로 메세지 변경
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Queue Listener 등록 및 IdleEvent 설정
Queue Listener를 등록하는 방법은 2가지가 있습니다.
- @RabbitListener 어노테이션을 사용
- SimpleMessageListenerContainer를 사용
아래 예제에서는 SimpleMessageListenerContainer를 통해 Listener를 등록해보겠습니다.
- SimpleMessageListenerContainer를 생성 후 ConnectionFactory를 설정하여 메시지 브로커와의 연결을 관리합니다.
- setQueueNames를 통해 리스너가 메세지를 수신할 queue 이름을 설정합니다.
- IdleEventInterval을 통해 Queue가 특정 시간(1분)간 사용되지 않은 경우 IdleEvent 발생합니다.
- idleEvent 3번 이상 발생 시 Listener 해제 및 Queue 삭제합니다.
@Configuration
@Profile("rabbitmq")
@DependsOn("globalProperties")
@RequiredArgsConstructor
public class RabbitMqListenerConfig {
@Value("${cmp-agent.uuid:}")
private String queueName;
private final RabbitMqRequestMessageListener listener;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setIdleEventInterval(60000L);
simpleMessageListenerContainer.setMessageListener(listener);
return simpleMessageListenerContainer;
}
}
@Component
@Profile("rabbitmq")
@RequiredArgsConstructor
public class RabbitMqRequestMessageListener implements ChannelAwareMessageListener {
private final RabbitTemplate rabbitTemplate;
private final ActionScheduler actionScheduler;
private int idleCount = 0;
private static final int MAX_IDLE_COUNT = 2;
@EventListener
public void handleIdleEvent(ListenerContainerIdleEvent event) {
idleCount++;
if (idleCount >= MAX_IDLE_COUNT) {
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
connectionFactory.clearConnectionListeners();
SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer) event.getSource();
simpleMessageListenerContainer.stop();
idleCount = 0;
}
}
// 생략
}
Producer Code
RabbitMQ 서버에 메세지를 전달하는 Producer 코드 예제입니다.
- RabbitTemplate을 통해 Queue에 QueueMessage(JSON) 데이터를 전달합니다.
- QueueMessage에는 해당 액션을 요청한 user 정보와 ACTION_CODE, receiver 정보 등을 전달합니다.
- QueueName과 동일한 이름의 Queue 존재 여부 확인 및 Queue 생성 / 리스너 등록
@Service
@RequiredArgsConstructor
public class RabbitMqServiceImpl implements RabbitMqService {
private final ActionQueueSender actionQueueSender;
private final RabbitAdmin rabbitAdmin;
private final ConnectionFactory connectionFactory;
private final ApplicationContext applicationContext;
private final GlobalProperties globalProperties;
private final UserMapper userMapper;
@Override
public void sendAction(Long userSeq, ActionCode actionCode) {
if(!globalProperties.isRabbitmqUsage())
return;
RabbitMqActionRequestInfo rabbitMqActionRequestInfo = new RabbitMqActionRequestInfo(targetSeq);
QueueMessage queueMessage = QueueMessage.builder()
.actionCode(actionCode)
.dto(rabbitMqActionRequestInfo)
.build();
String queueName = userMapper.getDetail(userSeq).getUuid();
// queueName과 일치하는 queue가 없을 경우 Queue 생성 및 Listener 설정
if(rabbitAdmin.getQueueProperties(queueName) == null) {
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
RabbitMqRequestMessageListener listener = (RabbitMqRequestMessageListener) applicationContext.getBean("rabbitMqRequestMessageListener");
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueueNames(queueName);
listenerContainer.setIdleEventInterval(60000L);
listenerContainer.setMessageListener(listener);
listenerContainer.start();
}
// 메세지 전송
actionQueueSender.sendAsync(queueName, queueMessage);
}
}
@Service
@Configuration
public class ActionQueueSender {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
public void sendAsync(String queuename, QueueMessage queueMessage){
MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
MessageProperties messageProperties = new MessageProperties();
Message message = messageConverter.toMessage(queueMessage, messageProperties);
this.rabbitTemplate.convertAndSend(queuename, message);
}
public ActionQueueSender(final RabbitTemplate rabbitTemplate, final ObjectMapper objectMapper) {
this.rabbitTemplate = rabbitTemplate;
this.objectMapper = objectMapper;
}
}
QueueMessage 클래스는 제네릭 타입을 지원하도록 함으로써 메세지에 데이터 전송 객체(DTO)를 포함 할 수 있도록 구현했습니다.
사용자 id, 액션 코드 그리고 데이터 전송 객체를 포함하여 전달 할 수 있습니다.
public class QueueMessage <T> {
private String userId;
private T dto;
private ActionCode actionCode;
public QueueMessage() {
}
public static QueueMessage.QueueMessageBuilder builder() {
return new QueueMessage.QueueMessageBuilder();
}
public String getUserId() {
return userId;
}
public void setUserId(final String userId) {
this.userId = userId;
}
public Object getDto() {
return dto;
}
public void setDto(final T dto) {
this.dto = dto;
}
public ActionCode getActionCode() {
return actionCode;
}
public void setActionCode(final ActionCode actionCode) {
this.actionCode = actionCode;
}
public QueueMessage(final String userId,
final T dto,
final ActionCode actionCode) {
this.userId = userId;
this.dto = dto;
this.actionCode = actionCode;
}
public static class QueueMessageBuilder<T> {
private String userId;
private T dto;
private ActionCode actionCode;
QueueMessageBuilder() {
}
public QueueMessage.QueueMessageBuilder userId(final String userId) {
this.userId = userId;
return this;
}
public QueueMessage.QueueMessageBuilder dto(final T dto) {
this.dto = dto;
return this;
}
public QueueMessage.QueueMessageBuilder actionCode(final ActionCode actionCode) {
this.actionCode = actionCode;
return this;
}
public QueueMessagebuild() {
return new QueueMessage(this.userId, this.dto, this.actionCode);
}
public String toString() {
return "QueueMessage.QueueMessageBuilder(userId=" + this.userId + ", object=" + this.dto + ", actionCode=" + this.actionCode;
}
}
}
Consumer Code
RabbitMQ 서버로부터 메세지를 전달 받는 Consumer 코드 예제입니다.
- 브로커로부터 메시지를 수신하기 위해 ChannelAwareListener를 사용 해야 합니다. ChannelAwareListener는 Spring AMQP 라이브러리에서 제공하는 인터페이스로, RabbitMQ의 채널(Channel)을 명시적으로 관리할 수 있습니다. Channel 객체를 직접 다루면, 메세지 확인, 재처리, 거부 등 다양한 기능을 사용할 수 있습니다.
- RabbitMqListenerConfig 에서 설정한 Queue에 Message가 들어오면 ChannelAwareListener의 onMessage가 수행됩니다.
- QueueMessage을 통해 전달 받은 ACTION_CODE에 따라 액션을 수행합니다.
@Component
@Profile("rabbitmq")
@RequiredArgsConstructor
public class RabbitMqRequestMessageListener implements ChannelAwareMessageListener {
private final RabbitTemplate rabbitTemplate;
private final ActionScheduler actionScheduler;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
QueueMessage queueMessage = (QueueMessage) messageConverter.fromMessage(message);
actionScheduler.schedule(queueMessage);
}
}
728x90
반응형
'OpenSource > RabbitMQ' 카테고리의 다른 글
Docker로 RabbitMQ 클러스터 구성하기 (0) | 2024.06.12 |
---|---|
Docker에 RabbitMQ 설치 (2) | 2024.06.11 |
[RabbitMQ] RabbitMQ 설치 및 Cluster 구성 방법 (0) | 2024.06.10 |
[RabbitMQ] RabbitMQ의 주요 개념 (0) | 2022.05.03 |