CS/Apache Kafka

[Kafka - 03] 프로듀서의 내부 동작 원리와 구현, 중복 없는 전송

JWonK 2023. 8. 31. 19:16
728x90
반응형

프로듀서가 전송하려는 메시지들은 프로듀서의 send() 메소드를 통해 시리얼라이저, 파티셔너를 거쳐 카프카로 전송된다.

먼저 파티셔너가 무엇인지 알아본다.

 

 

1. 파티셔너


  • 카프카의 토픽은 성능 향상을 위한 병렬 처리가 가능하도록 하기 위해 파티션으로 나뉘고, 최소 하나 또는 둘 이상의 파티션으로 구성된다.
  • 그리고 프로듀서가 카프카로 전송할 메시지는 해당 토픽 내 각 파티션의 로그 세그먼트에 저장된다.
  • 따라서 프로듀서는 토픽으로 메시지를 보낼 때 해당 토픽의 어느 파티션으로 메시지를 보내야 할지를 결정해야 하는데, 이때 사용하는 것이 바로 파티셔너이다.
  • 프로듀서가 파티션을 결정하는 알고리즘은 기본적으로 메시지(레코드)의 키를 해시 처리해 파티션을 구하는 방식이다.
  • 예상치 못한 많은 양의 메시지가 카프카로 인입되는 경우, 카프카는 클라이언트의 처리량을 높이기 위해 토픽의 파티션을 늘릴 수 있는 기능을 제공한다.

 

파티션 수 증가에 따른 해시 변경 방식

 

 

 

 

 

 

1-1. 라운도 로빈 전략


  • 프로듀서의 메시지 중 레코드의 키 값은 필숫값이 아니므로, 관리자는 별도의 레코드 키 값을 지정하지 않고 메시지를 전송할 수 있다.
  • 만약 키 값을 지정하지 않는다면 키값은 null이 되고, 기본값인 라운드 로빈 알고리즘을 사용해 프로듀서는 목적지 토픽의 파티션들로 레코드들을 랜덤 전송 한다.
  • 파티셔너를 거친 후의 레코드들은 배치 처리를 위해 프로듀서의 버퍼 메모리 영역에서 잠시 대기한 후 카프카로 전송한다.
  • 배치 처리를 위해 잠시 메시지들이 대기하는 과정에서 라운드 로빈 전략은 효율을 떨어뜨릴 수 있다. 어떠한 모습인지 나타낼 수 있는지보자

 

키값이 null인 라운드 로빈 전략

 

  • 위 그림에서 보이듯이 라운드 로빈 전략으로 균등하게 배분할 경우 배치로 인해 모두 전송되지 못한채 대기하게 된다.
  • 배치 기준 3을 충족하지 못했기 때문이다.
  • 카프카에서는 이와 같은 비효율적인 전송을 보완하기 위해 스티키 파티셔닝[sticky partitioning] 전략을 구성했다.

 

 

 

 

 

 

1-2. 스티키 파티셔닝 전략


  • 라운드 로빈 전략에서 지연시간이 불필요하게 증가되는 비효율적인 전송을 개선하고자 스티키 파티셔닝 전략을 사용하게 된다.
  • 스피키 파티셔닝 전략이란 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략을 말한다.

스티키 파티셔닝 전략

 

→ 스티키 파티셔닝 전략을 적용함으로써 기본 설정에 비해 약 30% 이상 지연시간이 감소하고 프로듀서의 CPU 사용률도 줄어드는 효과를 얻을 수 있었다.

 

 

 

 

 

 

 

 

 

 

 

2. 프로듀서의 배치


  • 카프카에서는 토픽의 처리량을 높이기 위한 방법으로 토픽을 파티션으로 나눠 처리하며, 카프카 클라이언트인 프로듀서에서는 처리량을 높이기 위해 배치 전송을 궈낮ㅇ한다.
  • 따라서 프로듀서에서는 카프카로 전송하기 전, 배치 전송을 위해 토픽의 파티션 별로 레코드들을 잠시 보관하고 있다.
  • 프로듀서는 배치 전송을 위해 다음과 같은 옵션들을 제공한다.
    • buffer.memory : 카프카로 메시지들을 전송하기 위해 담아두는 프로듀서의 버퍼 메모리 옵션이다. 기본값은 32MB로 설정되어 있으며 조정 가능하다.
    • batch.size : 배치 전송을 위해 메시지들을 묶는 단위를 설정하는 배치 크기 옵션이다. 기본값은 16KB며 설정 가능하다.
    • linger.ms : 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간을 설정하는 옵션이다. 단위는 ms이며 초기값은 0이다.
  • 프로듀서의 배치 전송 방식은 단건의 메시지를 전송하는 것이 아니라 한 번에 다량의 메시지를 묶어서 전송하는 방법이다. 
  • 이러한 배치 전송은 불필요한 I/O를 줄일 수 있어 매우 효율적이며, 더불어 카프카의 요청 수를 줄여주는 효과도 있다. 
  • buffer.memory 크기는 반드시 batch.size보다 커야 한다.

 

 

 

 

 

 

 

 

 

3. 중복 없는 전송


멱등성이란, 동일한 작업을 여러 번 수행하더라도 결과가 달라지지 않는 것을 의미한다.

 

메시지 시스템들의 메시지 전송 방식에는 '적어도 한 번 전송(at-least-once)', '최대 한 번 전송(at-most-once)', '정확히 한 번 전송(exactly-once)'이 있다.

 

적어도 한 번 전송 과정

 

  1. 프로듀서가 브로커의 특정 토픽으로 메시지A를 전송한다.
  2. 브로커는 메시지A를 기록하고, 잘 받았다는 ACK를 프로듀서에게 응답한다.
  3. 브로커의 ACK를 받은 프로듀서는 다음 메시지인 메시지B를 브로커에게 전송한다.
  4. 브로커는 메시지B를 기록하고, 잘 받았다는 ACK를 프로듀서에게 전송하려고 한다. 하지만 네트워크 오류 또는 브로커 장애가 발생하여 결국 프로듀서는 메시지B에 대한 ACK를 받지 못한다.
  5. 메시지B를 전송한 후 브로커로부터 ACK를 받지 못한 프로듀서는 브로커가 메시지B를 받지 못했다고 판단해 메시지B를 재전송한다.

 

→ 네트워크의 회선 장애나 기타 장애 상황에 따라 일부 메시지 중복이 발생할 수는 있지만, 최소한 하나의 메시지는 반드시 보장한다는 것이 적어도 한 번 전송 방식이며, 카프카는 적어도 한 번 전송 방식을 기반으로 동작한다.

 

 

 

최대 한 번 전송 과정

 

  1. 프로듀서가 브로커의 특정 토픽으로 메시지A를 전송한다.
  2. 브로커는 메시지A를 기록하고, 잘 받았다는 ACK를 프로듀서에게 응답한다.
  3. 프로듀서는 다음 메시지인 메시지B를 브로커에게 전송한다.
  4. 브로커는 메시지B를 기록하지 못하고, 잘 받았다는 ACK를 프로듀서에게 전송하지 못한다.
  5. 프로듀서는 브로커가 메시지B를 받았다고 가정하고 메시지C를 전송한다.

 

적어도 한 번 전송 최대 한 번 전송
메시지 손실 가능성 X 메시지 중복 가능성 O
메시지 손실 가능성 O 메시지 중복 가능성 X

 

 

 

 

중복 없는 전송 과정

 

  1. 프로듀서가 브로커의 특정 토픽으로 메시지A를 전송한다. 이때 PID 0과 메시지 번호 0을 헤더에 포함해 함께 전송한다.
  2. 브로커는 메시지A를 저장하고, PID와 메시지 번호 0을 메모리에 기록한다. 그리고 메시지를 잘 받았다는 ACK를 프로듀서에게 응답한다.
  3. 프로듀서는 다음 메시지인 메시지B를 브로커에게 전송한다. PID는 동일하게 0이고, 메시지 번호는 1이 증가하여 1이 된다.
  4. 브로커는 메시지B를 저장하고, PID와 메시지 번호 1을 메모리에 기록한다. 그리고 메시지를 잘 받았다는 ACK를 프로듀서에게 전송하려고 한다. 하지만 네트워크 오류 또는 브로커 장애가 발생하여 프로듀서는 메시지B에 대한 ACK를 받지 못한다.
  5. 브로커로부터 ACK를 받지 못한 프로듀서는 브로커가 메시지B를 받지 못했다고 판단해 메시지B를 재전송한다.

 

  • 적어도 한 번 전송 과정과 동일하지만 프로듀서가 재전송한 메시지B의 헤더에서 PID(0)와 메시지 번호(1)를 비교해서 메시지B가 이미 브로커에 저장되어 있는 것을 확인한 브로커는 메시지를 중복 저장하지 않고 ACK만 보낸다. 
  • 이러한 브로커의 동작 덕분에 브로커에 저장된 메시지는 중복을 피할 수 있게 된다.
  • PID는 사용자가 별도로 생성하는 것이 아니며 프로듀서에 위해 자동 생성된다.
  • 또한, 이 PID는 프로듀서와 카프카 사이에서 내부적으로만 이용되므로 사용자에게 따로 노출되지 않는다. 또한 메시지마다 부여되는 시퀀스 번호는 0번부터 시작해 순차적으로 증가한다.
  • 프로듀서가 보낸 메시지의 시퀀스 번호가 브로커가 갖고 있는 시퀀스 번호보다 정확하게 하나 큰 경우가 아니라면, ㅂ로커는 프로듀서의 메시지를 저장하지 않는다. 바로 이 동작 때문에 메시지 중복을 피할 수 있다.
  • 메시지 중복을 피하기 위해 사용하는 PID와 시퀀스 번호 정보는 브로커의 메모리에 유지되고, 리플리케이션 로그에도 저장된다.

 

 

[ 중복 없는 전송을 위한 프로듀서 설정 ]

프로듀서 옵션 설명
enable.idempotence true - 프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션
- 기본값은 false이므로, 이 옵션을 설정하기 원한다면 true로 변경해야 함
- true로 변경 시 다음에 나오는 옵션들도 반드시 변경해야 함
- 그렇지 않으면 ConfigException 발생
max.in.flight.requests.per.connection 1 ~ 5 ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수이다.
기본값은 5이며, 5이하로 설정해야 한다
acks all 프로듀서 acks와 관련된 옵션으로서, 기본값은 1이며 all로 설정해야함
retries 5 ACK를 받지 못한 경우 재시도를 해야 하므로 0보다 큰 값으로 설정해야함

 

 

 

 

 

4. 정확히 한 번 전송


  • 앞서 카프카에서는 멱등성 옵션을 이용해 중복 없는 전송을 할 수 있다고 설명하였다.
  • 하지만 이 중복 없는 전송 방식이 정확히 한 번 전송한다는 의미는 아니다. 
  • 카프카에서 정확히 한 번 전송은 트랜잭션과 같은 전체적인 프로세스 처리를 의미하며, 중복 없는 전송은 정확히 한 번 전송의 일부 기능이라 할 수 있다.
  • 전체적인 프로세스를 관리하기 위해 카프카에서는 정확히 한 번 처리를 담당하는 별도의 프로세스가 있는데 이를 트랜잭션 API라고 부른다.

 

 

 

4-1. 디자인


  • 프로듀서가 카프카로 정확히 한 번 방식으로 메시지를 전송할 때, 프로듀서가 보내는 메시지들은 원자적으로(atomic) 처리되어 전송에 성공하거나 실패하게 된다.
  • 이런 프로듀서의 전송을 위해 카프카에는 컨슈머 그룹 코디네이터와 동일한 개념으로 트랜잭션 코디네이터(transaction cordinator) 라는 것이 서버 측에 존재한다.
  • 트랜잭션 코디네이터의 역할은 프로듀서에 의해 전송된 메시지를 관리하며, 커밋 또는 중단 등을 표시한다.
  • 카프카에서는 컨슈머 오프셋 관리를 위해 오프셋 정보를 카프카의 내부 토픽에 저장하는데, 트랜잭션도 동일하게 트랜잭션 로그를 카프카의 내부 토픽인 _transaction_state에 저장한다. 
  •  _transaction_state는 카프카의 내부 토픽이지만 이 역시 토픽이므로 파티션 수와 리플리케이션 팩터 수가 존재하며, 브로커의 설정을 통해 관리자가 설정할 수 있다. 기본값은 다음과 같다.
    • transaction.state.log.num.partitions=50
    • transaction.state.log.replication.factor=3
  • 프로듀서는 트랜잭션 관련 정보를 트랜잭션 코디네이터에게 알리고, 모든 정보의 로그는 트랜잭션 코디네이터가 직접 기록한다.
  • 정확히 한 번 전송을 이용해 전송된 메시지들이 카프카에 저장되면, 카프카의 메시지를 다루는 클라이언트들은 해당 메시지들이 정상적으로 커밋된 것인지 또는 실패한 것인지 식별할 수 있어야 한다.
  • 카프카에서는 이를 식별학 위한 정보로서, 컨트롤 메시지라고 불리는 특별한 타입의 메시지가 추가로 사용된다.

 

  • 중복 없는 전송과 정확히 한 번 전송의 옵션 설정에서 가장 큰 차이점이자 주의해야 할 설정은 TRANSACTIONAL_ID_CONFIG이다.

 

 

 

 

 

4-2. 단계별 동작


트랜잭션 코디네이터 찾기

 

  1. 정확히 한 번 전송을 위해서는 트랜잭션 API를 이용한다고 하였다. 따라서 가장 먼저 수행하는 작업은 트랜잭션 코디네이터 찾기이다.
  2. 트랜잭션 코디네이터는 브로커에 위치한다.
  3. 트랜잭션 코디네이터의 주 역할은 PID와 transactional.id를 매핑하고 해당 트랜잭션 전체를 관리하는 것이다.
  4. 만약 트랜잭션 코디네이터가 존재하지 않는다면 신규 트랜잭션 코디네이터가 생성된다.

 

 

프로듀서 초기화

 

  1. 다음 프로듀서는 initTransactions() 메소드를 이용해 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터로 보낸다.
  2. 이때 PID(transactional.id)가 설정된 경우에는 InitPidRequest와 함께 TID가 트랜잭션 코디네이터에게 전송된다.
  3. 트랜잭션 코디네이터에는 TID, PID를 매핑하고 해당 정보를 트랜잭션 로그에 기록한다.
  4. 그런 다음 PID 에포크를 한 단계 올리는 동작을 하게 되고, PID 에포크가 올라감에 따라 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청은 무시된다.
  5. 에포크를 활용하는 이유는 신뢰성 있는 메시지 전송을 하기 위함이다.

 

 

트랜잭션 시작

 

  1. 프로듀서는 beginTransaction() 메소드를 이용해 새로운 트랜잭션의 시작을 알리게 된다.
  2. 프로듀서는 내부적으로 트랜잭션이 시작됐음을 기록하지만, 트랜잭션 코디네이터 관점에서는 첫 번째 레코드가 전송될 때까지 트랜잭션이 시작된 것은 아니다.

 

 

트랜잭션 상태 추가

 

  1. 다음으로 트랜잭션 상태 추가 동작이다. 트랜잭션 코디네이터는 전체 트랜잭션을 관리한다
  2. 그리고 각 트랜잭션 상태의 내용을 기록하는 일도 매우 중요하다. 
  3. 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달하고, 트랜잭션 코디네이터는 해당 정보를 트랜잭션 로그에 기록한다.
  4. 만약 트랜잭션 로그에 추가되는 첫 번째 파티션이라면, 트랜잭션 코디네이터는 해당 트랜잭션에 대한 타이머를 시작한다.
  5. 기본값으로 1분 동안 트랜잭션 상태에 대한 업데이트가 없다면, 해당 트랜잭션은 실패로 처리한다.

 

 

메시지 전송

 

  1. 다음은 메시지 전송이다. 이 단계에서 프로듀서는 해당 토픽의 파티션으로 메시지를 전송한다.
  2. 위 그림에서 브로커가 2개 있는 이유는 트랜잭션 코디네이터가 있는 브로커와 프로듀서가 전송하는 메시지를 받는 브로커가 서로 다르기 때문이다.

 

 

트랜잭션 종료 요청

 

  1. 메시지 전송을 완료한 프로듀서는 commitTransaction() 메소드 또는 abortTransaction() 메소드 중 하나를 호출해야 하며, 해당 메소드의 호출을 통해 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 알린다.
  2. 트랜잭션 코디네이터는 두 단계의 커밋 과정을 시작하게 되며, 첫 번째 단계로 트랜잭션 로그에 해당 트랜잭션에 대한 PrepareCommit or PrepareAbort를 기록한다.

 

사용자 토픽에 표시 요청

 

  1. 다음은 사용자 토픽에 표시하는 단계이다. 트랜잭션 코디네이터는 두 번째 단계로서 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록한다.
  2. 여기서 기록하는 메시지가 바로 컨트롤 메시지이다.

 

예를 들어, 트랜잭션 프로듀서가 파티션0에 메시지를 전송했고 해당 메시지의 오프셋이 1이라고 가정해보자. 트랜잭션 코디네이터는 파티션0에 트랜잭션 커밋 표시 메시지를 기록하고, 이 추가 메시지(컨트롤 메시지)로 인해 파티션0의 마지막 오프셋은 2로 증가한다. 이 메시지는 해당 PID의 메시지가 제대로 전송됐는지 여부를 컨슈머에게 나타내는 용도로도 사용된다. 따라서 트랜잭션 커밋이 끝나지 않은 메시지는 컨슈머에게 반환하지 않으며, 오프셋의 순서 보장을 위해 트랜잭션 성공 또는 실패를 나타내는 LSO[Last Stable Offset]라는 오프셋을 유지하게 된다.

 

 

트랜잭션 완료

 

  1. 마지막 단계 트랜잭션 완료이다. 
  2. 트랜잭션 코디네이터는 완료됨(Commited)이라고 트랜잭션 로그에 기록한다.
  3. 그리고 프로듀서에게 해당 트랜잭션이 완료됨을 알린 다음 해당 트랜잭션에 대한 처리는 모두 마무리된다.
  4. 트랜잭션을 이용하는 컨슈머는 read_committed 설정을 하면 트랜잭션에 성공한 메세지들만 읽을 수 있게 된다.
728x90
반응형