개요
Kafka에서 Exactly Once Semantic을 보장하는 방법에 대해서 알아본다.
조건
Kafka에서 Exactly-Once Semantic (이하 EoS)를 보장하기 위해선 2가지 조건이 필수적이다
- Idempotence Producer
- Transaction API
Idempotence Producer
Idempotence Producer는 이름에서 알 수 있듯, 메세지를 멱등하게 보낼 수 있는 프로듀서를 의미한다.
멱등하게 보냄은 메세지가 중복없이 전송되는 것을 의미한다. 하지만 멱등함이 메세지를 정확히 한 번 전송함을 의미하는 것은 아니다.
Idempotence Producer가 활성화되면 프로듀서는 숫자로 구성된 PID를 할당받는다. Idempotence Producer가 전송하는 메세지에는 파티션 별로 구분되는 시퀀스 번호가 연속적으로 부여된다.
즉, 전송된 메세지에는 PID-Sequence Number가 포함되어 있다.
브로커는 “프로듀서가 전송한 메세지의 시퀀스 번호 = 브로커가 소유한 메세지의 시퀀스 번호 + 1" 을 만족할 때만 메세지를 저장한다.
브로커는 .snapshot 파일에 프로듀서 및 파티션 별 시퀀스 번호를 주기적으로 저장한다.

Kafka에서 Idempotence Producer를 설정하기 위해선 다음과 같은 설정이 필요하다.
| Configuration | Value | Description | 
| enable.idempotence | true | 멱등성 프로듀서를 설정하기 위한 값으로 기본 값은 false이다. | 
| max.in.flight.requests.per.connection | 1 ~ 5 | ACK 받기 전 하나의 커넥션에서 전송할 수 있는 최대 요청 값 (5이하로 설정) | 
| acks | all(=-1) | 프로듀서가 전송한 메세지를 브로커가 제대로 수신했는지 확인하는 설정. all은 모든 메세지가 ISR까지 복제됨을 보장한다. | 
| retries | 5 | ACK 를 받지 못한 경우 재시도 하는 횟수 (0보다 큰 값으로 설정) | 
Transcation API
Kafka의 Transaction API는 Idempotence Producer가 전송한 메세지 커밋을 성공하거나 실패하는 데 있어 원자성을 보장한다.
Idempotence Producer가 중복 없이 메세지를 전송하고, 이를 Transcation API를 통해 원자적으로 커밋에 성공하면 EoS가 보장된다.
Kafka Transaction을 사용하기 위해서 다음과 같은 설정이 필요하다.
| Configuration | Value | Description | 
| transactional.id (for Producer) | 트랜잭션에 사용할 고유한 ID, Transaction Producer 인스턴스 별로 고유하게 설정된다. | |
| isolation.level (for Consumer) | read_committed | Non-Transaction 레코드, COMMIT 상태의 레코드만 읽도록 설정 | 
| enable.auto.commit (for Consumer) | false | 수동 오프셋 커밋을 위해 설정 | 

트랜잭션 코디네이터의 역할
프로듀서가 전송한 메세지를 관리하고 Commit/Abort와 같은 상태를 관리한다.
TID와 매핑되는 고유한 PID를 생성하고 관리한다.
트랜잭션 전체 과정을 관리하며 Commit/Abort를 나타낼 컨트롤 레코드를 전송한다.
- FindCoordinator: 프로듀서는 브로커에게- FindCoordinatorRequest를 전송해 트랜잭션 코디네이터 위치를 찾는다.
- InitProducerId: 프로듀서는- initTransaction()메서드를 통해 InitPidRequest를 트랜잭션 코디네이터에 전송한다.- 트랜잭션 코디네이터는 TID, PID를 매핑하고 PID에 해당하는 에포크 값을 증가시킨다.
- 에포크가 증가됨에 따라, 이전 에포크를 갖는 프로듀서가 전송한 쓰기 요청은 무시된다.
 
- AddPartitionsToTxn: 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달한다. 트랜잭션 코디네이터는 전달받은 토픽 파티션 정보를 트랜잭션 로그에- Ongoing상태로 기록한다.
- Produce: 프로듀서는 토픽 파티션으로 메세지를 전송한다. 메세지에는 PID, 에포크, 시퀀스 번호가 포함된다.
- AddOffsetsToTxn: 프로듀서가 컨슈머 그룹의 오프셋 커밋을 트랜잭션에 포함하기 위해- sendOffsetsToTransaction()을 호출한다. (전송할 메세지가 컨슈머로부터 가져온 레코드인 경우 필요) 트랜잭션 코디네이터는 트랜잭션 로그에 커밋할 토픽 파티션을 기록한다.
- OffsetCommit: 실제- __consumer_offsets토픽에 커밋하기 위한 단계이다. 트랜잭션이 성공적으로 커밋되었을 때만 적용된다. (- commitTransaction()이 호출되었을 때 반영된다.0
- EndTxn: 프로듀서는- commitTransaction()/abortTransaction()호출을 통해 트랜잭션의 완료를 트랜잭션 코디네이터에게 알린다. 트랜잭션 코디네이터는 첫 번째 커밋 단계로 트랜잭션 로그에- PREPARE_COMMIT/PREPARE_ABORT를 기록한다.
- WriteTxMarkers: 트랜잭션 코디네이터는 토픽 파티션 리더에- COMMIT/ABORT컨트롤 레코드 기록 요청을 보낸다.- __consumer_offset토픽에 실제로 컨트롤 레코드가 기록된다.
Example
public class KafkaTransactionsExample {
  public static void main(String args[]) {
      // read_committed, enable.auto.commit false
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
      // Idempotence Producer Configuratainon + transcational.id
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
    producer.initTransactions();
    while(true) {
        try {
          ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
          if (!records.isEmpty()) {
            producer.beginTransaction(); // 트랜잭션 시작
            List<ProducerRecord<String, String>> outputRecords = processRecords(records);
            for (ProducerRecord<String, String> outputRecord : outputRecords) {
              producer.send(outputRecord); // 레코드 전송
            }
                // 컨슈머가 폴링한 오프셋 커밋을 위해 사용
            sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
            producer.commitTransaction();
      } catch (Exception e) {
          producer.abortTransaction();
        }
    }
  }
}
'Kafka > Kafka' 카테고리의 다른 글
| Kafka Connect: Incremental Cooperative Rebalancing (0) | 2024.12.09 | 
|---|---|
| Kafka: Static Membership (0) | 2024.11.03 | 
| Kafka Issue: InvalidRecordException (related Timestamp) (0) | 2024.06.01 | 
| Kafka: Kafka Broker 내부 구조 (0) | 2024.04.28 | 
