개요
- Confluent S3 Sink Connector는 Custom Partitioner, Custom TimestampExtractor 구현을 통해 요구사항에 맞게 S3 버킷 경로 구성이 가능하다.
- TimeBasedPartitioner 사용 시 Custom TimestampExtractor를 이용해 Record에서 Tiemstamp를 추출해서 버킷 경로를 구성해보자.
TimestampExtractor
사용 용도
- Kafka Record 내에서 특정 기준으로
Timestamp
를 추출하는 기능을 제공하는 인터페이스 - Confluent에서 공식적으로 제공하는
TimestampExtractor
구현체는 다음과 같다.RecordFieldTimestampExtractor
: Record내 타임스탬프를 사용하는 필드에서 추출RecordTimestampExtractor
: Record Timestamp 기준으로 추출WallclockTimestampExtractor
: Kafka Connect가 동작하는 서버의 현재 Time 기준 추출
- S3 Sink Connector에서는
TiemstampExtractor
는 S3 Object를 업로드할 때 (TimeBasedPartitioner 사용 시) 날짜 별 파티셔닝(encodePartition
) 지원 및 기존 파일을 닫고, 새로운 파일을 열 때 (rotation.interval.ms
) 주로 사용된다.
인터페이스
package io.confluent.connect.storage.partitioner;
import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;
public interface TimestampExtractor {
void configure(Map<String, Object> config);
Long extract(ConnectRecord<?> record);
default Long extract(ConnectRecord<?> record, long nowInMillis) {
return this.extract(record);
}
}
- Confluent에서 제공하는 TimestampExtractor 인터페이스
configure
: Kafka Connector 설정에 필요한 Configuration을 지정한다.extract(ConnectRecord<?> record)
: Record로부터 정해진 기준에 따라 Timestamp를 추출한다.default extract(ConnectRecord<?> record, long nowInMillis)
- Record로부터 정해진 기준에 따라 Timestamp를 추출한다.
TimestampExtractor
가 Wallclock을 사용하는 경우를 지원하기 위해default
메서드를 추가한 것으로 예상된다.
실제로 WallclockTimestampExtractor의 경우 extract시 nowInMillis를 바로 return하는 것을 알 수 있다.
public Long extract(ConnectRecord<?> record, long nowInMillis) {
return nowInMillis;
}
S3 Sink Connector
- 위에서
TimestampExtractor
의 사용 용도를 2가지 정도 언급했다. 그 중 S3 Object 업로드 시 날짜 별 파티셔닝에 사용되는 부분(encdePartition
)을 확인해보자. - S3 Sink Connector의
TopicPartitionWriter
클래스에서는 Record를 모아서 파일을 생성하고, 파티션 경로를 지정하는 역할을 한다. (이외에도 다른 역할을 하지만 현재 세션에서는 위의 역할만 파악하고 넘어가자.)
// S3 업로드 시 Recod 기준에 따라 파티셔닝하기 위한 Path를 획득한다.
encodedPartition = partitioner.encodePartition(record, now);
...
// 파티셔너에서 토픽과 encodePartition을 통해 S3 Object Prefix를 결정한다.
private String getDirectoryPrefix(String encodedPartition) {
return partitioner.generatePartitionedPath(tp.topic(), encodedPartition);
}
- 코드를 확인해보면
partitioner
를 통해encodePartition
을 획득하고,generatePartitionPath
를 통해 실제 디렉토리 prefix를 결정하는 것을 확인할 수 있다. - 즉,
partitioner
구현체의encodePartition
메서드와generatePartitionPath
메서드를 통해 디렉토리 Prefix를 획득할 수 있다.
Partitioner
- 기본적으로 Confluent는 큰 범주에서 2가지
Partitioner
구현체를 제공하고 있다.FieldPartitioner
: 레코드의 특정 필드를 기준으로 파티셔닝한다.TimeBasedPartitioner
:TimestampExtractor
를 반드시 지정해야 하며,TimestampExtractor
에서 추출한 값에 따라 포맷(path.format
)에 맞게 파티셔닝한다.
TimeBasedPartitioner
의encodePartition
과generatePartitionPath
메서드를 확인해보자.
@Override
public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
Long timestamp = timestampExtractor.extract(sinkRecord, nowInMillis);
return encodedPartitionForTimestamp(sinkRecord, timestamp);
}
private String encodedPartitionForTimestamp(SinkRecord sinkRecord, Long timestamp) {
... // Timestamp Null일 때 예외 처리 코드 생략
// Timestamp는 ms단위이기 때문에 파티션 조정
DateTime bucket = new DateTime(
getPartition(partitionDurationMs, timestamp, formatter.getZone())
);
// Path.format (e.g YYYY/MM/DD/HH)에 맞게 encodePartition 생성
return bucket.toString(formatter);
}
// TimeBasedPartitioner 부모 클래스인 DefaultPartitioner에서 구현됨
@Override
public String generatePartitionedPath(String topic, String encodedPartition) {
// 단순히 토픽명 + encodePartition 값을 사용해 Prefix를 생성
return topic + delim + encodedPartition;
}
TimestampExtractor
의extract()
메서드를 통해Timestamp
를 추출하고, 이를 기반으로 포맷에 맞게 파티셔닝을 하는 것을 확인할 수 있다.generatePartitionedPath
에서는 단순히 토픽 명과 위에서 추출된encodedPartition
을 이용해 이어진 문자열 (디렉토리 prefix)을 반환하는 것을 확인할 수 있다.- 즉,
TimeBasedPartitioner
를 사용할 때,TimestampExtractor
를 구현하고 적용하면 우리 입맛에 맞게 DirectoryPrefix를 결정할 수 있다는 의미이다.
Custom TimestampExtractor 구현
Prerequisite
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
// https://mvnrepository.com/artifact/io.confluent/kafka-connect-storage-partitioner
implementation 'io.confluent:kafka-connect-storage-partitioner:11.2.3'
// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
implementation 'org.apache.kafka:connect-api:3.6.1'
}
task fatJar(type: Jar) {
archiveClassifier.set('fat')
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
with jar
}
jar {
dependsOn fatJar
}
io.confluent:kafka-connect-storage-partitioner
:Partitioner
,TimestampExtractor
구현을 위해 필요한 라이브러리- Kafka Connect Plugin에 적용할
fatJar
를 만들기 위한 DSL 작성
Custom Timestamp Extractor Implementation
- Kafka Record Header에
RFC 3339
혹은Timestamp(ms)
를 추출하는 요구사항이 있다고 가정해보자.
public class RecordHeaderTimestampExtractor implements TimestampExtractor {
// 어떤 header로부터 타임스탬프를 추출할지 Custom Configuration이 추가되어야 한다.
public static final String TIMESTAMP_HEADER = "timestamp.header";
private String recordHeaderKey;
@Override
public void configure(Map<String, Object> map) {
this.recordHeaderKey = (String) map.get(TIMESTAMP_HEADER);
// 해당 TiemstampExtractor는 Header에서 추출이 필요하기 때문에 header 정보가 없으면 Exception을 던진다.
if (recordHeaderKey.isBlank()) {
log.error("timestamp.header must not be empty.");
throw new ConfigException("timestamp.header must not be empty.");
}
}
// 밑에서 계속
...
}
- Header에서 Timestamp를 추출하기 위한 Header Key를 Configuration으로 받는다.
public class RecordHeaderTimeestmapExtractor implements TimestampExtractor {
@Override
public Long extract(ConnectRecord<?> record) {
Header header = record.headers().lastWithName(this.recordHeaderKey);
// header key 없는 경우 Exception
if (Objects.isNull(header)) {
log.error("Not found header: {}", recordHeaderKey);
throw new PartitionException("No header matching timestamp.header exists.");
}
// Kafka Connect에서 RFC 3339는 org.apache.kafka.connect.data.Timestamp로 판단
if ("org.apache.kafka.connect.data.Timestamp".equals(header.schema().name())) {
return ((Date) header.value()).getTime();
} else {
// header 스키마가 Timestamp ms인 경우 lonvg value 변환
switch (header.schema().type()) {
case INT32, INT64 -> {
return ((Number) header.value()).longValue();
}
case STRING -> {
// String 포맷인 경우 RFC 3339 String이라 가정
return extractTimestampFromString((String) header.value());
}
default -> {
log.error("Unsupported type '{}' for record header timestamp.", header.schema().type().getName());
throw new PartitionException("Error extracting timestamp from record header: " + recordHeaderKey);
}
}
}
}
private Long extractTimestampFromString(String timestampValue) {
try {
// RFC3339 format (yyyy-mm-ddThh:mm:ss.sssZ)
Instant timestamp = Instant.parse(timestampValue);
return timestamp.toEpochMilli();
} catch (DateTimeParseException e) {
log.error("[RecordHeaderTimestampExtractor] Invalid timestamp format: {}", timestampValue);
throw new PartitionException("Invalid timestamp format: " + timestampValue, e.getCause());
} catch (Exception e) {
log.error("[RecordHeaderTimestampExtractor] Error extracting timestamp from string: {}", timestampValue);
throw new PartitionException("Error extracting timestamp from string: " + timestampValue, e.getCause());
}
}
}
- Record의 Header에서 Configuration으로 받은 Header Key와 일치하는 마지막 헤더를 찾는다.
- Header의 포맷을이
RFC 3339
인 경우Date
로 캐스팅 후getTime()
을 통해Timestamp(ms)
를 반환한다. Timestamp(ms)
자체로 들어오는 경우 단순히 Long 값으로 반환한다.
Custom TimestampExtractor 적용
- Build한 FatJar를 S3 Sink Connector가 들어가 있는 Plugin Path에 적용
- e.g)
/opt/kafka/plugins/s3-sink-connector
- e.g)
- Configuration 적용
- REST API 사용:
PUT /connectors/{connector_name}/config
- Strimzi Cluster Operator 사용: YAML에 적용한 값 그대로 반영
- REST API 사용:
YAML에 적용된 Configuration은 REST API에 적용하는 Configuration과 동일
timestamp.extractor: com.zayson.extractor.RecordHeaderTimestampExtractor # fullpath
timestamp.header: createdate
...
- Custom TimestampExtractor를 적용할 때는 반드시 Full-Path를 적용해야 한다.
Confluent에서 구현해놓은 클래스의 경우 Wallclock, Record, RecordField만 입력해도 Class Fullpath를 만들지만,
이외의 Class는 FullPath를 만들어주지 않는다. (하단 코드 참조)
switch (extractorClassName) {
case "Wallclock":
case "Record":
case "RecordField":
// full path 생성
extractorClassName = "io.confluent.connect.storage.partitioner.TimeBasedPartitioner$" + extractorClassName + "TimestampExtractor";
default:
Class<?> klass = Class.forName(extractorClassName);
if (!TimestampExtractor.class.isAssignableFrom(klass)) {
throw new ConnectException("Class " + extractorClassName + " does not implement TimestampExtractor");
} else {
return (TimestampExtractor)klass.newInstance();
}
}
테스트
record = new ProducerRecord<>("oops" ,null, 1722815999999L, null, message,null); // 2024-08-04T23:59:59.999Z
record1 = new ProducerRecord<>("oops", null, 1722816000000L,null, message1, null); // 2024-08-05T00:00:00.000Z
record2 = new ProducerRecord<>("oops", null, 1722816000001L, null, message2, null); // 2024-08-05T00:00:00.001Z
record.headers().add("createdate", "1999-12-31T23:59:59.999Z".getBytes());
record1.headers().add("createdate", "2000-01-01T00:00:00.000Z".getBytes());
record2.headers().add("createdate", "946688400000".getBytes()); // 2000-01-01T01:00:00:000Z
- Record Timestamp 범위: 2024-08-04T23:59:59:000 ~ 2024-08-05T00:00:00.000Z
- Record Header Timestamp 범위: 1999-12-31T23:59:59.999Z ~ 2000-01-01T01:00:00.000Z
S3 저장 결과
aws s3 ls s3://my-bucket --recursive
- S3에 적재된 파일을 조회해보면 Prefix가 RecordHeader에 담긴 값 그대로 파티셔닝된 것을 확인할 수 있다.
반응형
'Kafka > Kafka Connect' 카테고리의 다른 글
Strimzi, Kafka Connect를 이용한 데이터 통합 파이프라인 환경 구성하기 (0) | 2024.10.25 |
---|