Kafka
Kafka: Exactly-Once Semantic (w/ Transaction_)
개요Kafka에서 Exactly Once Semantic을 보장하는 방법에 대해서 알아본다. 조건Kafka에서 Exactly-Once Semantic (이하 EoS)를 보장하기 위해선 2가지 조건이 필수적이다Idempotence ProducerTransaction API Idempotence ProducerIdempotence Producer는 이름에서 알 수 있듯, 메세지를 멱등하게 보낼 수 있는 프로듀서를 의미한다.멱등하게 보냄은 메세지가 중복없이 전송되는 것을 의미한다. 하지만 멱등함이 메세지를 정확히 한 번 전송함을 의미하는 것은 아니다. Idempotence Producer가 활성화되면 프로듀서는 숫자로 구성된 PID를 할당받는다. Idempotence Producer가 전송하는 메세지에는 파티..
Kafka: Static Membership
서론Kafka Consumer는 여러 가지 이벤트를 통해 리밸런싱이 발생한다.Consumer ↔ GroupCoordinator 간 session.timeout.ms 내에 Heartbeat가 수신되지 않은 경우Consumer가 Consumer Group을 나가거나 들어오는 경우Consumer Group이 구독하는 토픽의 파티션이 추가되는 경우 등Consumer는 리밸런싱이 시작되면 완료되기 전까지 데이터를 처리하지 못한다. 이로 인해 데이터 처리가 일시 중단되며 지연이 발생한다.또한, 리밸런싱 과정에서 파티션을 Revoke/Assign 하는 과정에서 네트워크 부하 등으로 인한 클러스터 성능 저하가 발생한다. Kafka 2.3.0부터는 이러한 리밸런싱의 단점을 보완하기 위해 Static Membership이..
Strimzi, Kafka Connect를 이용한 데이터 통합 파이프라인 환경 구성하기
서론현재 회사에서 진행중인 메인 프로젝트는 실시간으로 게임으로부터 들어오는 데이터를 통합하고 이를 가공해 제 3자 서비스에게 가공된 데이터를 제공해주는 파이프라인을 구축하는 것입니다. 팀에서는 대용량 데이터를 실시간으로 스트리밍 처리하기 위해 Kafka를 사용 중입니다.따라서, 실시간으로 외부 저장소에 적재되는 게임 데이터를 Kafka로 통합하고, 가공된 데이터를 외부 저장소로 적재하는 기술 도입이 필요했습니다. Kafka Connect는 Kafka의 구성 요소로서 분산 데이터 처리 플랫폼에서 스트리밍 데이터 통합 프레임워크입니다.다양한 외부 저장소(Source/SInk)로부터 Kafka Connector를 통해 데이터를 가져오고, 적재할 수 있는 장점이 있습니다. 일반적으로 Kafka Connect는 ..
Kafka Connect: Custom TimestmapExtractor
개요Confluent S3 Sink Connector는 Custom Partitioner, Custom TimestampExtractor 구현을 통해 요구사항에 맞게 S3 버킷 경로 구성이 가능하다.TimeBasedPartitioner 사용 시 Custom TimestampExtractor를 이용해 Record에서 Tiemstamp를 추출해서 버킷 경로를 구성해보자.TimestampExtractor사용 용도Kafka Record 내에서 특정 기준으로 Timestamp를 추출하는 기능을 제공하는 인터페이스Confluent에서 공식적으로 제공하는 TimestampExtractor 구현체는 다음과 같다.RecordFieldTimestampExtractor: Record내 타임스탬프를 사용하는 필드에서 추출Re..
Kafka Issue: InvalidRecordException (related Timestamp)
에러 발생 환경AWS MSK (3.6.0), Tiered Storage 활성화에러 로그"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception│ from producer send callback\\n\\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:340)\\n\\tat│ org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:133)\\n\\tat..