partitioned라는 이름으로 파티션 6개와 레플리케이션 1개를 구성한 토픽을 생성하였습니다.
Topic과 partition
메시지는 topic으로 분류되고, topic은 여러 개의 partition으로 나눠 질 수 있습니다. 하나의 topic은 1개 이상의 partition으로 구성되어 있습니다.
하나의 topic에 여러 개의 partition을 나눠서 메시지를 쓰는 이유? 메시지는 Kafka의 해당 topic에 쓰여지는데, 몇 천건의 메시지가 동시에 kafka에 쓰여진다 가정할 때, 하나의 partition에 순차적으로 append 되면 처리하는 것이 번거로워집니다. 때문에 여러 개의 partition을 두어 분산 저장을 하는 것입니다. 병렬 처리할 수 있으니까요!
하지만 한 번 늘린 파티션은 절대 줄일 수 없고, 파티션을 늘렸을 때 메시지가 Round-robin 방식으로 쓰여지게 됩니다. 즉, 순차적으로 메시지가 쓰여지지 않아 메시지의 순서가 중요한 모델이라면 순차적으로 소비됨을 보장해주지 않기 때문에 위험할 수 있습니다.
Topic 관련 Config 파일 작성은 끝났습니다.
이어서 Producer 관련 Config 객체를 생성해보도록 하겠습니다.
3. KafkaProducerConfig 작성하기
KafkaProducerConfig.java
package com.example.kafkatest.Config;
import com.example.kafkatest.Model.Greeting;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
public ProducerFactory<String, Greeting> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
}
ProducerFactory 객체를 이용하여 각 메시지 종류별로, 메시지를 어디에 보내고, 어떤 방법으로 처리할 지 설정해줍니다.
우리가 실제로 사용할 KafkaTemplate 객체를 적절한 설정과 함께 생성해줍니다.
Producer를 통해 Kafka에 message를 send 하기 위해서는 KafkaProducer 인스턴스를 사용하여 send() 메서드를 호출해야합니다.
KafkaTemplate은 KafkaProducer를 감싸고 있는 인스턴스라고 생각하면 이해하시기 편합니다.
KafkaTemplate.send()는 내부에서 결국 KafkaProducer 인스턴스의 send()를 호출하고 있습니다.
결국 KafkaTemplate은 topic에 message를 보낼 수 있는 편리한 메서드를 제공한다고 생각하면 됩니다.
Producer는 KafkaTemplate을 이용하여 message를 topic에 전송합니다.
여기서 주의 깊게 볼 점은 KEY_SERIALIZER_CLASS_CONFIG와 VALUE_SERIALIZER_CLASS_CONFIG 입니다.
KEY_SERIALIZER_CLASS_CONFIG는 key에 사용할 직렬화 클래스입니다.
VALUE_SERIALIZER_CLASS_CONFIG는 value에 사용할 직렬화 클래스입니다.
각각의 설정은 모두 key와 value에 대해 자신이 보내는 메시지 타입을 지정해줍니다.
이 예제에서는 단순 String 값만 보내는 message와 Entity 객체를 보내는 message 모두 있으므로 각각 설정해주도록 합니다.
(2가지 종류의 message를 정의하였습니다.단순 String만 보낼 경우, StringSerializer.class를 사용하면 되고, Entity 객체를 직접 생성해서 Consumer에게 보낼 경우, JsonSeriaizer.class 클래스를 사용하면 됩니다.)
다음으로 실제로 message를 가져오는 Consumer에 대한 코드를 작성해보겠습니다.
4. KafkaConsumerConfig 작성하기
KafkaConsumerConfig.java
package com.example.kafkatest.Config;
import com.example.kafkatest.Model.Greeting;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
factory.setRecordFilterStrategy(record -> record.value().contains("world"));
return factory;
}
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"greeting");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ErrorHandlingDeserializer(new JsonDeserializer<>(Greeting.class)));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<String, Greeting>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
}
위의 Producer와 유사하게 ConsumerFactory 객체를 이용하여 각 message 종류별로, message를 어디에서 받고, 어떠한 방법으로 처리할 것인지를 설정해줍니다.
위의 예제에서, Consumer와 같은 경우, 위에서 설정한 각 Topic별로 메시지를 어디서/어떻게 받을지를 설정해주는 메서드들을 지정하였습니다. (Consumer는 partition에 저장된 데이터를 읽어오는 역할을 합니다)
MessageListeners, @KafkaListener 두 가지로 구현이 가능한데, 위의 예제에서는 @KafkaListener를 사용하여 구현하였습니다.
@KafkaListener를 사용하기 위한 필수 조건은 다음과 같습니다. (KafkaConsumerConfig에서 설정해주어야 합니다)
@Configuration
@EnableKafka
kafkaListenerContainerFactory Bean 객체
Spring Kafka에서는 2개의 MessageListenerContainer 구현체를 제공합니다.
KafkaMessageListenerContainer
단일 스레드로 동작하는 Consumer입니다.
ConcurrentMessageListenerContainer
내부적으로 하나 이상의 KafkaMessageListenerContianer로 구성되는 멀티 스레드 방식의 Consumer입니다.
concurrency 속성을 추가로 설정할 수 있으며, 이 값에 따라 kafka message listener container가 개별적으로 생성되어 멀티 스레드를 처리할 수 있습니다.
5. KafkaListener Annotation 사용하기
마지막으로, 위에서 작성한 설정을 기반으로 메시지 큐에 데이터를 넣고 빼는 예제를 작성해보겠습니다.
아래에서 하나씩 천천히 작성해보도록 하겠습니다.
먼저, 메시지를 생성하는 부분입니다.
public static class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaTemplate<String, Greeting> greetingKafkaTemplate;
@Value(value = "${message.topic.name}")
private String topicName;
@Value(value = "${partitioned.topic.name}")
private String partitionedTopicName;
@Value(value = "${filtered.topic.name}")
private String filteredTopicName;
@Value(value = "${greeting.topic.name}")
private String greetingTopicName;
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=[" + message
+ "] due to : " + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message
+ "] with offset=[" + result.getRecordMetadata().offset()+"]");
}
});
}
public void sendMessageToPartition(String message, int partition) {
kafkaTemplate.send(partitionedTopicName, partition, null, message);
}
public void sendMessageToFiltered(String message) {
kafkaTemplate.send(filteredTopicName, message);
}
public void sendGreetingMessage(Greeting greeting) {
greetingKafkaTemplate.send(greetingTopicName, greeting);
}
}
Producer는 KafkaTemplate을 이용해서 데이터를 Topic에 전송합니다.
Kafka는 기본적으로 비동기방식으로 동작합니다. KafkaTemplate의 반환값이 Future 타입인데, Future 객체에 Callback Interface를 추가하여 처리할 수 있습니다.
send 메소드로 데이터를 전송하고 나면 ListenableFuture<SendResult> 객체가 리턴되고, 이 객체에 callback을 등록할 수 있습니다.
메시지 큐 방식으로 통신하는 경우 필연적으로 비동기 방식으로 통신하는데 메시지가 언제 올지 모르기 때문에 callback 함수를 등록합니다.
다음으로, 메시지를 받는 부분입니다.
public static class MessageListener {
private CountDownLatch latch = new CountDownLatch(3);
private CountDownLatch filterLatch = new CountDownLatch(2);
private CountDownLatch greetingLatch = new CountDownLatch(1);
private CountDownLatch partitionLatch = new CountDownLatch(2);
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo' : " + message);
latch.countDown();
}
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
System.out.println("Received Message in group 'bar' : " + message);
latch.countDown();
}
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Recieved Message: " + message + " from partition: " + partition);
latch.countDown();
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = {"0", "3"}), containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition);
this.partitionLatch.countDown();
}
@KafkaListener(topics = "${filtered.topic.name}", containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener : " + message);
this.filterLatch.countDown();
}
@KafkaListener(topics = "${greeting.topic.name}", containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
System.out.println("Received greeting message : " + greeting);
this.greetingLatch.countDown();
}
}
@KafkaListener를 통해 파라미터를 오버로딩해서 알맞은 listenerContainer를 자동으로 주입합니다.
(앞에서 말씀드렸다시피, ListenerContainer는 KafkaMessageListenerContainer와 ConcurrentMessageListenerContainer 두 가지 구현체가 있습니다.)
@KafkaListener를 통해 어떤 Topic의 message를 어떤 방식으로 받을지, 특정 partition의 message를 받거나 특정 그룹의 message를 받는 등의 설정을 할 수 있습니다.
Sent message=[Hello World] with offset=[27]
Received Message in group 'bar' : Hello World
Received Message in group 'foo' : Hello World
Recieved Message: Hello World from partition: 0
Received Message: Hello To Partitioned Topic from partition: 0
Received Message: Hello To Partitioned Topic from partition: 3
Received Message in filtered listener : Hello Baeldung
Received Message in filtered listener : Hello World
Received greeting message : Greetings, World!