프로듀서는 보통 카프카 프로듀서 API와 그것으로 구성된 애플리케이션을 말합니다.
프로듀서는 브로커에 특정 토픽(혹은 파티션 영역까지)을 지정하여 메시지를 전달하는 역할을 담당합니다.
프로듀서를 통해 전달되는 메시지의 구조는 다음과 같습니다.
프로듀서 구조와 메시지 전달 과정
- 프로듀서는 아래의 4가지 과정을 통해 브로커에게 메시지를 전달합니다.
- 직렬화 (Serializer)
- 파티셔닝 (Partitioner)
- 메시지 배치 (Record Accumulator)
- 압축 (Compression)
- 전달 (Sender)
각 과정을 천천히 살펴보겠습니다.
프로듀서는 먼저, 전달 요청 받은 메시지를 직렬화합니다.
직렬화는 Serializer가 지정된 설정을 통해 처리하며, 메시지의 키와 같은 바이트 뭉치 형태로 변환됩니다.
직렬화 과정을 마친 메시지는 Partitioner를 통해 어떤 토픽의 어떤 파티션에 저장 될 지를 결정합니다. 이 과정을 파티셔닝(Partitioning)이라고 합니다. Partitioner는 정의된 로직에 따라 파티셔닝을 진행하는데, 별도의 설정이 없다면 Round Robin 형태로 파티셔닝 합니다. 즉, 파티션들에게 골고루 메시지를 전달하도록 파티셔닝합니다.
만약, 메시지 전달 요청에 특정 파티션이 지정되어 왔을 경우, 별도의 파티셔닝 없이 해당 파티션으로 전달됩니다.
만약 메시지 압축이 설정되었다면, 설정된 포멧에 맞춰 메시지를 압축합니다. 압축된 메시지는 브로커로 빠르게 전달할 수 있을 뿐더러, 브로커 내부에서 빠른 복제가 가능하도록 합니다.
파티셔닝과 압축을 마친 후, 프로듀서는 프로듀서는 메시지를 TCP 프로토콜을 통해 브로커의 리더 파티션으로 전송합니다. 하지만 네트워크 전송은 매우 무거운 처리이기 때문에 메시지를 매번 네트워크를 통해 전달하는 것은 비효율적입니다. 그래서 프로듀서는 지정된 만큼 메시지를 저장했다가 한 번에 브로커로 전달합니다. 이 과정을 내부의 Record Accumulator가 담당하여 처리합니다. Record Accumulator는 각 토픽 파티션에 대응하는 배치 큐를 구성하고 메시지들을 레코드 배치 형태로 묶어 큐에 저장합니다.
각 배치 큐에 저장된 레코드 배치들은 때가 되면 각각 브로커에 전달됩니다. 이 과정을 Sender가 처리하는데, Sender는 스레드 형태로 구성되며, 관리자가 설정한 특정 조건에 만족하는 레코드 배치를 브로커에게 전송합니다. 이때, Sender 스레드는 네트워크 비용을 줄이기 위해 piggyback 방식 으로 조건을 만족하지 않은 다른 레코드 배치를 조건을 만족한 것과 함께 브로커로 전송합니다.
Piggyback은 '등 뒤에 업다'라는 뜻입니다. 예를 들면, 토픽 B의 파티션1의 큐에 레코드 배치가 전송 조건을 만족했다고 가정하겠습니다. Sender는 해당 레코드 배치를 가져와 Broker3에게 전송할 준비를 합니다. 이때, 토픽 A의 파티션2가 전송 조건을 만족하지 않았더라도 같은 Broker3에게 전송 되어야 하므로, Sender는 토픽 A의 레코드 배치를 업어 한번에 3번 브로커로 전송합니다.
브로커에 네트워크 전송 요청을 보낸 Sender는 설정 값에 따라 브로커의 응답을 기다리거나 혹은 기다리지 않습니다. 만약 응답을 기다리지 않는 설정인 경우, 메시지 전송에 대한 과정이 마쳐집니다. 반면, 응답을 기다리는 경우, 메시지 전송 성공 여부를 응답으로 받습니다. 이때, 브로커에서 메시지 전송이 실패한 경우에는 설정 값에 따라 재시도를 시도합니다. ** 재시도 횟수를 초과한 경우에는 예외를 뱉어냅니다. 반대로 **성공한 경우에는 메시지가 저장된 정보(메타데이터)를 반환합니다. 메타 데이터는 메시지가 저장된 토픽, 파티션, 오프셋, 타임 스탭프 정보를 가지고 있습니다.
Producer Configs
- bootstrap.servers
- 연결할 서버 정보입니다.(카프카 프로듀서가 접속할 브로커 주소)
host1:port1. host2:port2
와 같이 여러 개를 나열할 수 있습니다. - 초기 커넥션 연결 시 사용하기 때문에, 모든 서버 리스트를 포함할 필요는 없다. 실제 메시지 전송 시 새로운 커넥션을 맺은 다음 전송하기 때문.
- 연결할 서버 정보입니다.(카프카 프로듀서가 접속할 브로커 주소)
- key.serializer, value.serializer
- 메시지를 serialize 할 때 사용할 클래스를 지정하면 됩니다.
- ByteArraySerializer, StringSerializer, IntegerSerializer 등 Serializer를 implements한 클래스들이 있습니다.
- partitioner.class
- 어떤 파티션에 메시지를 전송할지 결정하는 클래스입니다.
- 기본값은 DefautlPartitioner이며 메시지 키의 해시 값을 기반으로 전송할 파티션을 결정합니다.
- compression.type
- 프로듀서에 의해 생성되는 모든 데이터 압축 타입에 대한 설정입니다.
- 기본값은 none으로 압축하지 않습니다.
- 설정 값으로는 none, gzip, snappy, lz4, zstd가 있습니다.
- batch.size
- producer는 적은 네트워크 요청을 위해 동일한 파티션에게 보낼 레코드를 일괄처리 하는데 기본 배치 크기를 제어하여 클라이언트와 서버 성능 모두를 향상시킵니다.
- 이 크기보다 큰 레코드를 일괄처리하지 않습니다.
- 압축은 전체 데이터 배치로 이루어지므로 일괄 처리의 효율성도 압축 비율에 영향을 미칩니다.
- acks
- 프로듀서가 전송한 메시지를 카프카가 잘 받은 걸로 처리할 기준입니다.
- 0, 1, all로 세팅할 수 있으며 각각 메시지 손실률과 전송 속도에 대해 차이가 있습니다.
설정값 손실률 속도 설명 acks = 0 높음 빠름 프로듀서는 서버의 확인을 기다리지 않고, 메시지 전송이 끝나면 성공으로 간주합니다. acks = 1 보통 보통 카프카의 leader가 메시지를 잘 받았는지만 확인합니다. acks = all 낮음 느림 카프카의 leader와 follower까지 모두 받았는지 확인합니다.
- buffer.memory
- 프로듀서가 서버로 전송 대기 중인 레코드를 버퍼링하는데 사용할 수 있는 메모리 양입니다.
- 레코드가 서버에 전달될 수 있는 것보다 더 빨리 전송되면 max.block.ms 동안 레코드를 보내지 않습니다.
- max.block.ms
- 버퍼가 가득 찼거나 메타데이터를 사용할 수 없을 때 차단할 시간을 정비할 수 있습니다.
- retries
- 프로듀서가 에러 났을 때 다시 시도할 횟수입니다.
- 0보다 큰 숫자로 설정하면 그 숫자만큼 오류 발생시에 재시도합니다.
- max.request.size
- 요청의 최대 바이트 크기입니다. 대용량 요청을 보내지 않도록 제한할 수 있습니다.
- 카프카 서버에도 별도로 설정할 수 있어, 서로 값이 다를 수 있습니다.
- producer.type
- 메시지를 동기, 비동기로 보낼지 선택할 수 있습니다.
- 비동기를 사용할 경우 메시지를 일정 시간동안 쌓은 후 처리하므로 처리 효율을 향상시킬 수 있습니다.
- request.timeout.ms
- 클라이언트가 요청 응답을 기다리는 최대 시간을 정할 수 있습니다.
- 정해진 시간 전에 응답을 받지 못하면 다시 요청을 보내거나 재시도 횟수를 넘어서면 요청이 실패합니다.
https://kafka.apache.org/documentation/#producerconfigs
https://always-kimkim.tistory.com/entry/kafka101-producer?category=876258
https://madplay.github.io/post/kafka-producer-consumer-options
'OpenSource > Kafka' 카테고리의 다른 글
[Kafka] Kafka Consumer (0) | 2022.02.24 |
---|---|
[Kafka] Message, Topic, Partition (0) | 2021.11.04 |
[Kafka] Consumer Group (0) | 2021.11.03 |
[Kafka] Spring Boot에 Kafka 연동하기 #2 (0) | 2021.10.27 |
[Kafka] Spring Boot에 Kafka 연동하기 #1 (0) | 2021.10.21 |