개요
Kafka
에서 Exactly Once Semantic
을 보장하는 방법에 대해서 알아본다.
조건
Kafka에서 Exactly-Once Semantic (이하 EoS)
를 보장하기 위해선 2가지 조건이 필수적이다
Idempotence Producer
Transaction API
Idempotence Producer
Idempotence Producer
는 이름에서 알 수 있듯, 메세지를 멱등하게 보낼 수 있는 프로듀서를 의미한다.
멱등하게 보냄은 메세지가 중복없이 전송되는 것을 의미한다. 하지만 멱등함이 메세지를 정확히 한 번 전송함을 의미하는 것은 아니다.
Idempotence Producer
가 활성화되면 프로듀서는 숫자로 구성된 PID를 할당받는다. Idempotence Producer
가 전송하는 메세지에는 파티션 별로 구분되는 시퀀스 번호가 연속적으로 부여된다.
즉, 전송된 메세지에는 PID-Sequence Number
가 포함되어 있다.
브로커는 “프로듀서가 전송한 메세지의 시퀀스 번호 = 브로커가 소유한 메세지의 시퀀스 번호 + 1"
을 만족할 때만 메세지를 저장한다.
브로커는 .snapshot
파일에 프로듀서 및 파티션 별 시퀀스 번호를 주기적으로 저장한다.
Kafka에서 Idempotence Producer
를 설정하기 위해선 다음과 같은 설정이 필요하다.
Configuration | Value | Description |
enable.idempotence | true | 멱등성 프로듀서를 설정하기 위한 값으로 기본 값은 false이다. |
max.in.flight.requests.per.connection | 1 ~ 5 | ACK 받기 전 하나의 커넥션에서 전송할 수 있는 최대 요청 값 (5이하로 설정) |
acks | all(=-1) | 프로듀서가 전송한 메세지를 브로커가 제대로 수신했는지 확인하는 설정. all은 모든 메세지가 ISR까지 복제됨을 보장한다. |
retries | 5 | ACK 를 받지 못한 경우 재시도 하는 횟수 (0보다 큰 값으로 설정) |
Transcation API
Kafka의 Transaction API
는 Idempotence Producer
가 전송한 메세지 커밋을 성공하거나 실패하는 데 있어 원자성을 보장한다.
Idempotence Producer
가 중복 없이 메세지를 전송하고, 이를 Transcation API
를 통해 원자적으로 커밋에 성공하면 EoS
가 보장된다.
Kafka Transaction을 사용하기 위해서 다음과 같은 설정이 필요하다.
Configuration | Value | Description |
transactional.id (for Producer) | 트랜잭션에 사용할 고유한 ID, Transaction Producer 인스턴스 별로 고유하게 설정된다. | |
isolation.level (for Consumer) | read_committed | Non-Transaction 레코드, COMMIT 상태의 레코드만 읽도록 설정 |
enable.auto.commit (for Consumer) | false | 수동 오프셋 커밋을 위해 설정 |
트랜잭션 코디네이터의 역할
프로듀서가 전송한 메세지를 관리하고 Commit/Abort와 같은 상태를 관리한다.
TID와 매핑되는 고유한 PID를 생성하고 관리한다.
트랜잭션 전체 과정을 관리하며 Commit/Abort를 나타낼 컨트롤 레코드를 전송한다.
FindCoordinator
: 프로듀서는 브로커에게FindCoordinatorRequest
를 전송해 트랜잭션 코디네이터 위치를 찾는다.InitProducerId
: 프로듀서는initTransaction()
메서드를 통해 InitPidRequest를 트랜잭션 코디네이터에 전송한다.- 트랜잭션 코디네이터는 TID, PID를 매핑하고 PID에 해당하는 에포크 값을 증가시킨다.
- 에포크가 증가됨에 따라, 이전 에포크를 갖는 프로듀서가 전송한 쓰기 요청은 무시된다.
AddPartitionsToTxn
: 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달한다. 트랜잭션 코디네이터는 전달받은 토픽 파티션 정보를 트랜잭션 로그에Ongoing
상태로 기록한다.Produce
: 프로듀서는 토픽 파티션으로 메세지를 전송한다. 메세지에는 PID, 에포크, 시퀀스 번호가 포함된다.AddOffsetsToTxn
: 프로듀서가 컨슈머 그룹의 오프셋 커밋을 트랜잭션에 포함하기 위해sendOffsetsToTransaction()
을 호출한다. (전송할 메세지가 컨슈머로부터 가져온 레코드인 경우 필요) 트랜잭션 코디네이터는 트랜잭션 로그에 커밋할 토픽 파티션을 기록한다.OffsetCommit
: 실제__consumer_offsets
토픽에 커밋하기 위한 단계이다. 트랜잭션이 성공적으로 커밋되었을 때만 적용된다. (commitTransaction()
이 호출되었을 때 반영된다.0EndTxn
: 프로듀서는commitTransaction()/abortTransaction()
호출을 통해 트랜잭션의 완료를 트랜잭션 코디네이터에게 알린다. 트랜잭션 코디네이터는 첫 번째 커밋 단계로 트랜잭션 로그에PREPARE_COMMIT/PREPARE_ABORT
를 기록한다.WriteTxMarkers
: 트랜잭션 코디네이터는 토픽 파티션 리더에COMMIT/ABORT
컨트롤 레코드 기록 요청을 보낸다.__consumer_offset
토픽에 실제로 컨트롤 레코드가 기록된다.
Example
public class KafkaTransactionsExample {
public static void main(String args[]) {
// read_committed, enable.auto.commit false
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
// Idempotence Producer Configuratainon + transcational.id
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions();
while(true) {
try {
ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
if (!records.isEmpty()) {
producer.beginTransaction(); // 트랜잭션 시작
List<ProducerRecord<String, String>> outputRecords = processRecords(records);
for (ProducerRecord<String, String> outputRecord : outputRecords) {
producer.send(outputRecord); // 레코드 전송
}
// 컨슈머가 폴링한 오프셋 커밋을 위해 사용
sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
}
'Kafka > Kafka' 카테고리의 다른 글
Kafka: Static Membership (0) | 2024.11.03 |
---|---|
Kafka Issue: InvalidRecordException (related Timestamp) (0) | 2024.06.01 |
Kafka: Kafka Broker 내부 구조 (0) | 2024.04.28 |