OpenSource/Kafka

[Kafka] Spring Boot에 Kafka 연동하기 #1

아장아장 초보 개발자 2021. 10. 21. 15:02
728x90
반응형

오늘은 Spring Boot 프로젝트에 Kafka를 연동하여 사용하는 방법에 대해 간단한 예제를 통해 살펴보겠습니다.

Kafka가 설치되어 있지 않다면 이 글을 참고해주시길 바랍니다.

1. 기본 환경 세팅하기

우선 Spring Boot 프로젝트를 하나 만들어줍니다.

Spring Boot 프로젝트가 준비 되었다면 build.gradle을 아래와 같이 수정하여 spring-kafka에 대한 의존성을 추가해줍니다.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    implementation 'org.springframework.boot:spring-boot-starter-web'

    implementation 'org.springframework.kafka:spring-kafka'
    
}

 

2. application.yml 작성하기

Bean을 통해 Producer/Consumer 설정을 할 수 있지만, 여기서는 application.yml을 통해 설정합니다.

Bean을 통한 설정은 다음에 하도록 하겠습니다^^

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: consumerGroup
      auto-offset-reset: earliest
    producer:
      bootstrap-servers: localhost:9092
  • spring.kafka.consumer
    • bootstrap-servers
      • Kafka 클러스터에 대한 초기 연결에 사용할 host:port를 나타냅니다.
    • group-id
      • consumer는 Consumer Group이 존재하므로, 유일하게 식별 가능한 Consumer Group을 작성합니다.
    • auto-offset-rest
      • Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성합니다. 
      • Consumer Group의 Consumer는 메시지를 소비할 때 Topic 내의 partition에서 다음에 소비할 offset이 어디인지 공유를 합니다. 
        그런데 오류로 인해 offset 정보가 없어졌을 때, 어떻게 offset을 reset 할 것인지를 명시합니다.
        • latest : 가장 최근에 생산된 메시지로 offset reset
        • earliest : 가장 오래된 메시지로 offset reset
        • none : offset 정보가 없으면 Exception 발생
  • spring.kafka.producer
    • bootstrap-servers
      • consumer.bootstrap-servers와 동일하며, producer 전용으로 오버라이딩 하려면 작성해야 합니다.

 

3. 예제 코드 작성하기

프로젝트의 기본 구조는 다음과 같습니다.

project 구조

KafkaController.java

post 방식으로 message 데이터를 받아서 Producer에게 전달하는 Controller를 만듭니다.

package com.example.kafkatest.Controller;

import com.example.kafkatest.Service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
  private final KafkaProducer kafkaProducer;

  @Autowired
  KafkaController(KafkaProducer kafkaProducer){
    this.kafkaProducer = kafkaProducer;
  }

  @PostMapping
  public String sendMessage(@RequestParam("message") String message){
    this.kafkaProducer.sendMessage(message);
    return "success";
  }

}

KafkaProducer.java

KafkaTemplate에 Topic 명과 Message를 전달합니다.

KafkaTemplate은 Topic에 데이터를 보내기 위한 메소드들을 제공합니다.

KafkaTemplate.send() 메서드가 실행되면, Kafka 서버로 메시지가 전송됩니다.

package com.example.kafkatest.Service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
  private static final String TOPIC = "test";
  private final KafkaTemplate<String, String> kafkaTemplate;

  @Autowired
  public KafkaProducer(KafkaTemplate kafkaTemplate){
    this.kafkaTemplate = kafkaTemplate;
  }

  public void sendMessage(String message){
    System.out.println(String.format("Produce message : %s", message));
    this.kafkaTemplate.send(TOPIC, message);
  }

}

KafkaConsumer.java

@KafkaListener 어노테이션을 사용하여 Kafka로부터 메시지를 받습니다.

package com.example.kafkatest.Service;

import java.io.IOException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

  @KafkaListener(topics = "test", groupId = "consumerGroup")
  public void consume(String message) throws IOException {
    System.out.println(String.format("Consumed message : %s", message));
  }

}

 

4. 실행하기

위의 코드를 전부 작성하였다면, API test tool인 Postman을 사용하여 테스트를 해보겠습니다.

실행하기 전 Kafka 서버가 제대로 실행되고 있는 지 확인 후 실행을 하면 제대로 동작하는 것을 확인하실 수 있습니다.

 

이제 Postman을 사용하여 메시지를 발송하겠습니다.

 

 

message 생성/소비에 대한 로그도 제대로 출력되는 것을 확인 할 수 있습니다. 

 

여기까지 Spring Boot와 Kafka를 연동하여 메시지 생성/소비에 대한 간단한 예제를 해보았습니다.

위 코드는 아래 github에서 확인하실 수 있습니다.

https://github.com/yunhaDevGit/Kafka-PubSub-Test

 

GitHub - yunhaDevGit/Kafka-PubSub-Test

Contribute to yunhaDevGit/Kafka-PubSub-Test development by creating an account on GitHub.

github.com

다음번엔 여러 Topic에 대한 설정 및 실습을 진행하도록 하겠습니다~

 

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html

https://victorydntmd.tistory.com/348

https://velog.io/@litien/2019-08-22-2108-%EC%9E%91%EC%84%B1%EB%90%A8-jyjzmoph0g

728x90
반응형