Zayson
A to Zayson!
Zayson
전체 방문자
오늘
어제
  • 분류 전체보기 (132)
    • Computer Science (20)
      • Network (4)
      • DB (12)
      • OS (4)
    • Algorithm (32)
      • 완전탐색(Brute-Force) (3)
      • 그리디(Greedy) (6)
      • 투포인터(Two-Pointer) (1)
      • 그래프(Graph) (5)
      • BFS & DFS (9)
      • 구현, 시뮬레이션(Implementation) (5)
      • 다이나믹 프로그래밍(DP) (3)
    • Backend (51)
      • Spring Boot (19)
      • JPA (16)
      • Kafka (2)
      • Java (13)
      • Kotlin (1)
    • DevOps (1)
      • Jenkins (5)
      • Oracle Cloud Infrastructure (1)
      • Kubernetes & Docker (1)
    • Trouble Shooting (3)
      • JPA (1)
      • Spring Boot (2)
    • 회고 (5)
      • 엔빵 프로젝트 포스트 로드맵 (1)
      • 2022년 (4)
    • Kafka (7)
      • Kafka (5)
      • Kafka Connect (2)
    • 기술 서적 (6)
      • 데이터 중심 애플리케이션 설계 (3)
      • 개발자가 반드시 정복해야할 객체 지향과 디자인 패턴 (2)
      • 가상 면접 사례로 배우는 대규모 시스템 설계 기초 (1)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

인기 글

태그

  • CS
  • 그리디
  • BFS
  • 라이브스터디
  • Java
  • Kafka Connect
  • Computer science
  • dfs
  • SpringBoot
  • 완전탐색
  • 백준
  • Backend
  • JPA
  • 엔빵프로젝트
  • 프로그래머스
  • 구현
  • kafka
  • 나 혼자 스프링부트!
  • spring boot
  • 관계형 데이터베이스 실전 입문

최근 글

티스토리

hELLO · Designed By 정상우.
Zayson

A to Zayson!

Kafka: Exactly-Once Semantic (w/ Transaction_)
Kafka/Kafka

Kafka: Exactly-Once Semantic (w/ Transaction_)

2025. 1. 27. 18:57

개요


Kafka에서 Exactly Once Semantic을 보장하는 방법에 대해서 알아본다.

 

조건


Kafka에서 Exactly-Once Semantic (이하 EoS)를 보장하기 위해선 2가지 조건이 필수적이다

  1. Idempotence Producer
  2. Transaction API

 

Idempotence Producer


Idempotence Producer는 이름에서 알 수 있듯, 메세지를 멱등하게 보낼 수 있는 프로듀서를 의미한다.

멱등하게 보냄은 메세지가 중복없이 전송되는 것을 의미한다. 하지만 멱등함이 메세지를 정확히 한 번 전송함을 의미하는 것은 아니다.

 

Idempotence Producer가 활성화되면 프로듀서는 숫자로 구성된 PID를 할당받는다. Idempotence Producer가 전송하는 메세지에는 파티션 별로 구분되는 시퀀스 번호가 연속적으로 부여된다.

즉, 전송된 메세지에는 PID-Sequence Number가 포함되어 있다.

 

브로커는 “프로듀서가 전송한 메세지의 시퀀스 번호 = 브로커가 소유한 메세지의 시퀀스 번호 + 1" 을 만족할 때만 메세지를 저장한다.

브로커는 .snapshot 파일에 프로듀서 및 파티션 별 시퀀스 번호를 주기적으로 저장한다.

 

중복된 요청에 대한 메세지는 커밋되지 않고, ACK만 날린다. 중복 요청된 메세지는 동일한  PID-SeqNo  조합을 갖는다

 

Kafka에서 Idempotence Producer를 설정하기 위해선 다음과 같은 설정이 필요하다.

Configuration Value Description
enable.idempotence true 멱등성 프로듀서를 설정하기 위한 값으로 기본 값은 false이다.
max.in.flight.requests.per.connection 1 ~ 5 ACK 받기 전 하나의 커넥션에서 전송할 수 있는 최대 요청 값 (5이하로 설정)
acks all(=-1) 프로듀서가 전송한 메세지를 브로커가 제대로 수신했는지 확인하는 설정.
all은 모든 메세지가 ISR까지 복제됨을 보장한다.
retries 5 ACK 를 받지 못한 경우 재시도 하는 횟수 (0보다 큰 값으로 설정)

 

Transcation API


Kafka의 Transaction API는 Idempotence Producer가 전송한 메세지 커밋을 성공하거나 실패하는 데 있어 원자성을 보장한다.

Idempotence Producer가 중복 없이 메세지를 전송하고, 이를 Transcation API를 통해 원자적으로 커밋에 성공하면 EoS가 보장된다.

 

Kafka Transaction을 사용하기 위해서 다음과 같은 설정이 필요하다.

Configuration Value Description
transactional.id (for Producer)   트랜잭션에 사용할 고유한 ID, Transaction Producer 인스턴스 별로 고유하게 설정된다.
isolation.level (for Consumer) read_committed Non-Transaction 레코드, COMMIT 상태의 레코드만 읽도록 설정
enable.auto.commit (for Consumer) false 수동 오프셋 커밋을 위해 설정

 

트랜잭션 코디네이터의 역할
프로듀서가 전송한 메세지를 관리하고 Commit/Abort와 같은 상태를 관리한다.
TID와 매핑되는 고유한 PID를 생성하고 관리한다.
트랜잭션 전체 과정을 관리하며 Commit/Abort를 나타낼 컨트롤 레코드를 전송한다.
  1. FindCoordinator: 프로듀서는 브로커에게 FindCoordinatorRequest를 전송해 트랜잭션 코디네이터 위치를 찾는다.
  2. InitProducerId: 프로듀서는 initTransaction() 메서드를 통해 InitPidRequest를 트랜잭션 코디네이터에 전송한다.
    • 트랜잭션 코디네이터는 TID, PID를 매핑하고 PID에 해당하는 에포크 값을 증가시킨다.
    • 에포크가 증가됨에 따라, 이전 에포크를 갖는 프로듀서가 전송한 쓰기 요청은 무시된다.
  3. AddPartitionsToTxn: 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달한다. 트랜잭션 코디네이터는 전달받은 토픽 파티션 정보를 트랜잭션 로그에 Ongoing 상태로 기록한다.
  4. Produce: 프로듀서는 토픽 파티션으로 메세지를 전송한다. 메세지에는 PID, 에포크, 시퀀스 번호가 포함된다.
  5. AddOffsetsToTxn: 프로듀서가 컨슈머 그룹의 오프셋 커밋을 트랜잭션에 포함하기 위해 sendOffsetsToTransaction()을 호출한다. (전송할 메세지가 컨슈머로부터 가져온 레코드인 경우 필요) 트랜잭션 코디네이터는 트랜잭션 로그에 커밋할 토픽 파티션을 기록한다.
  6. OffsetCommit: 실제 __consumer_offsets 토픽에 커밋하기 위한 단계이다. 트랜잭션이 성공적으로 커밋되었을 때만 적용된다. (commitTransaction()이 호출되었을 때 반영된다.0
  7. EndTxn: 프로듀서는 commitTransaction()/abortTransaction() 호출을 통해 트랜잭션의 완료를 트랜잭션 코디네이터에게 알린다. 트랜잭션 코디네이터는 첫 번째 커밋 단계로 트랜잭션 로그에 PREPARE_COMMIT/PREPARE_ABORT를 기록한다.
  8. WriteTxMarkers: 트랜잭션 코디네이터는 토픽 파티션 리더에 COMMIT/ABORT 컨트롤 레코드 기록 요청을 보낸다. __consumer_offset 토픽에 실제로 컨트롤 레코드가 기록된다.

 

Example


public class KafkaTransactionsExample {

  public static void main(String args[]) {
      // read_committed, enable.auto.commit false
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);

      // Idempotence Producer Configuratainon + transcational.id
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
    producer.initTransactions();

    while(true) {
        try {
          ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
          if (!records.isEmpty()) {
            producer.beginTransaction(); // 트랜잭션 시작

            List<ProducerRecord<String, String>> outputRecords = processRecords(records);
            for (ProducerRecord<String, String> outputRecord : outputRecords) {
              producer.send(outputRecord); // 레코드 전송
            }

                // 컨슈머가 폴링한 오프셋 커밋을 위해 사용
            sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());

            producer.commitTransaction();
      } catch (Exception e) {
          producer.abortTransaction();
        }
    }
  }
}

 

반응형
저작자표시 비영리 변경금지 (새창열림)

'Kafka > Kafka' 카테고리의 다른 글

Kafka Connect: Incremental Cooperative Rebalancing  (0) 2024.12.09
Kafka: Static Membership  (0) 2024.11.03
Kafka Issue: InvalidRecordException (related Timestamp)  (0) 2024.06.01
Kafka: Kafka Broker 내부 구조  (0) 2024.04.28
    'Kafka/Kafka' 카테고리의 다른 글
    • Kafka Connect: Incremental Cooperative Rebalancing
    • Kafka: Static Membership
    • Kafka Issue: InvalidRecordException (related Timestamp)
    • Kafka: Kafka Broker 내부 구조
    Zayson
    Zayson
    공부한 내용을 정리하는 공간

    티스토리툴바