Zayson
A to Zayson!
Zayson
전체 방문자
오늘
어제
  • 분류 전체보기 (132)
    • Computer Science (20)
      • Network (4)
      • DB (12)
      • OS (4)
    • Algorithm (32)
      • 완전탐색(Brute-Force) (3)
      • 그리디(Greedy) (6)
      • 투포인터(Two-Pointer) (1)
      • 그래프(Graph) (5)
      • BFS & DFS (9)
      • 구현, 시뮬레이션(Implementation) (5)
      • 다이나믹 프로그래밍(DP) (3)
    • Backend (51)
      • Spring Boot (19)
      • JPA (16)
      • Kafka (2)
      • Java (13)
      • Kotlin (1)
    • DevOps (1)
      • Jenkins (5)
      • Oracle Cloud Infrastructure (1)
      • Kubernetes & Docker (1)
    • Trouble Shooting (3)
      • JPA (1)
      • Spring Boot (2)
    • 회고 (5)
      • 엔빵 프로젝트 포스트 로드맵 (1)
      • 2022년 (4)
    • Kafka (7)
      • Kafka (5)
      • Kafka Connect (2)
    • 기술 서적 (6)
      • 데이터 중심 애플리케이션 설계 (3)
      • 개발자가 반드시 정복해야할 객체 지향과 디자인 패턴 (2)
      • 가상 면접 사례로 배우는 대규모 시스템 설계 기초 (1)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

인기 글

태그

  • kafka
  • 관계형 데이터베이스 실전 입문
  • 구현
  • 나 혼자 스프링부트!
  • Kafka Connect
  • 백준
  • CS
  • Backend
  • 엔빵프로젝트
  • 그리디
  • 프로그래머스
  • 라이브스터디
  • 완전탐색
  • JPA
  • BFS
  • Java
  • SpringBoot
  • dfs
  • Computer science
  • spring boot

최근 글

티스토리

hELLO · Designed By 정상우.
Zayson

A to Zayson!

Kafka Connect: Custom TimestmapExtractor
Kafka/Kafka Connect

Kafka Connect: Custom TimestmapExtractor

2024. 8. 21. 23:32

개요


  • Confluent S3 Sink Connector는 Custom Partitioner, Custom TimestampExtractor 구현을 통해 요구사항에 맞게 S3 버킷 경로 구성이 가능하다.
  • TimeBasedPartitioner 사용 시 Custom TimestampExtractor를 이용해 Record에서 Tiemstamp를 추출해서 버킷 경로를 구성해보자.

TimestampExtractor


사용 용도

  • Kafka Record 내에서 특정 기준으로 Timestamp를 추출하는 기능을 제공하는 인터페이스
  • Confluent에서 공식적으로 제공하는 TimestampExtractor 구현체는 다음과 같다.
    • RecordFieldTimestampExtractor: Record내 타임스탬프를 사용하는 필드에서 추출
    • RecordTimestampExtractor: Record Timestamp 기준으로 추출
    • WallclockTimestampExtractor: Kafka Connect가 동작하는 서버의 현재 Time 기준 추출
  • S3 Sink Connector에서는 TiemstampExtractor는 S3 Object를 업로드할 때 (TimeBasedPartitioner 사용 시) 날짜 별 파티셔닝(encodePartition) 지원 및 기존 파일을 닫고, 새로운 파일을 열 때 (rotation.interval.ms) 주로 사용된다.

인터페이스

package io.confluent.connect.storage.partitioner;

import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;

public interface TimestampExtractor {
  void configure(Map<String, Object> config);

  Long extract(ConnectRecord<?> record);

  default Long extract(ConnectRecord<?> record, long nowInMillis) {
    return this.extract(record);
  }
}
  • Confluent에서 제공하는 TimestampExtractor 인터페이스
    • configure: Kafka Connector 설정에 필요한 Configuration을 지정한다.
    • extract(ConnectRecord<?> record): Record로부터 정해진 기준에 따라 Timestamp를 추출한다.
    • default extract(ConnectRecord<?> record, long nowInMillis)
      • Record로부터 정해진 기준에 따라 Timestamp를 추출한다.
      • TimestampExtractor가 Wallclock을 사용하는 경우를 지원하기 위해 default 메서드를 추가한 것으로 예상된다.
실제로 WallclockTimestampExtractor의 경우 extract시 nowInMillis를 바로 return하는 것을 알 수 있다.
public Long extract(ConnectRecord<?> record, long nowInMillis) {
    return nowInMillis;
}

S3 Sink Connector


  • 위에서 TimestampExtractor의 사용 용도를 2가지 정도 언급했다. 그 중 S3 Object 업로드 시 날짜 별 파티셔닝에 사용되는 부분(encdePartition)을 확인해보자.
  • S3 Sink Connector의 TopicPartitionWriter 클래스에서는 Record를 모아서 파일을 생성하고, 파티션 경로를 지정하는 역할을 한다. (이외에도 다른 역할을 하지만 현재 세션에서는 위의 역할만 파악하고 넘어가자.)
// S3 업로드 시 Recod 기준에 따라 파티셔닝하기 위한 Path를 획득한다.
encodedPartition = partitioner.encodePartition(record, now); 

...

// 파티셔너에서 토픽과 encodePartition을 통해 S3 Object Prefix를 결정한다.
private String getDirectoryPrefix(String encodedPartition) {
    return partitioner.generatePartitionedPath(tp.topic(), encodedPartition);
}
  • 코드를 확인해보면 partitioner를 통해 encodePartition을 획득하고, generatePartitionPath를 통해 실제 디렉토리 prefix를 결정하는 것을 확인할 수 있다.
  • 즉, partitioner 구현체의 encodePartition 메서드와 generatePartitionPath 메서드를 통해 디렉토리 Prefix를 획득할 수 있다.

Partitioner


  • 기본적으로 Confluent는 큰 범주에서 2가지 Partitioner 구현체를 제공하고 있다.
    • FieldPartitioner: 레코드의 특정 필드를 기준으로 파티셔닝한다.
    • TimeBasedPartitioner: TimestampExtractor를 반드시 지정해야 하며, TimestampExtractor에서 추출한 값에 따라 포맷(path.format)에 맞게 파티셔닝한다.
  • TimeBasedPartitioner 의 encodePartition과 generatePartitionPath 메서드를 확인해보자.
@Override
public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
    Long timestamp = timestampExtractor.extract(sinkRecord, nowInMillis);
  return encodedPartitionForTimestamp(sinkRecord, timestamp); 
}

private String encodedPartitionForTimestamp(SinkRecord sinkRecord, Long timestamp) {
    ... // Timestamp Null일 때 예외 처리 코드 생략

    // Timestamp는 ms단위이기 때문에 파티션 조정
  DateTime bucket = new DateTime(
        getPartition(partitionDurationMs, timestamp, formatter.getZone())
  );
  // Path.format (e.g YYYY/MM/DD/HH)에 맞게 encodePartition 생성
  return bucket.toString(formatter);
}

// TimeBasedPartitioner 부모 클래스인 DefaultPartitioner에서 구현됨  
@Override
public String generatePartitionedPath(String topic, String encodedPartition) {
    // 단순히 토픽명 + encodePartition 값을 사용해 Prefix를 생성
    return topic + delim + encodedPartition;
} 
  • TimestampExtractor의 extract() 메서드를 통해 Timestamp를 추출하고, 이를 기반으로 포맷에 맞게 파티셔닝을 하는 것을 확인할 수 있다.
  • generatePartitionedPath 에서는 단순히 토픽 명과 위에서 추출된 encodedPartition을 이용해 이어진 문자열 (디렉토리 prefix)을 반환하는 것을 확인할 수 있다.
  • 즉, TimeBasedPartitioner를 사용할 때, TimestampExtractor를 구현하고 적용하면 우리 입맛에 맞게 DirectoryPrefix를 결정할 수 있다는 의미이다.

Custom TimestampExtractor 구현


Prerequisite

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
    // https://mvnrepository.com/artifact/io.confluent/kafka-connect-storage-partitioner
    implementation 'io.confluent:kafka-connect-storage-partitioner:11.2.3'
    // https://mvnrepository.com/artifact/org.apache.kafka/connect-api
    implementation 'org.apache.kafka:connect-api:3.6.1'
}

task fatJar(type: Jar) {
    archiveClassifier.set('fat')
    duplicatesStrategy = DuplicatesStrategy.EXCLUDE
    from {
        configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
    }
    with jar
}
jar {
    dependsOn fatJar
}
  • io.confluent:kafka-connect-storage-partitioner: Partitioner, TimestampExtractor 구현을 위해 필요한 라이브러리
  • Kafka Connect Plugin에 적용할 fatJar 를 만들기 위한 DSL 작성

Custom Timestamp Extractor Implementation

  • Kafka Record Header에 RFC 3339 혹은 Timestamp(ms)를 추출하는 요구사항이 있다고 가정해보자.
public class RecordHeaderTimestampExtractor implements TimestampExtractor {
    // 어떤 header로부터 타임스탬프를 추출할지 Custom Configuration이 추가되어야 한다.
    public static final String TIMESTAMP_HEADER = "timestamp.header";
    private String recordHeaderKey;

    @Override
    public void configure(Map<String, Object> map) {
      this.recordHeaderKey = (String) map.get(TIMESTAMP_HEADER);

      // 해당 TiemstampExtractor는 Header에서 추출이 필요하기 때문에 header 정보가 없으면 Exception을 던진다.
      if (recordHeaderKey.isBlank()) {
        log.error("timestamp.header must not be empty.");
        throw new ConfigException("timestamp.header must not be empty.");
      }
    }

// 밑에서 계속
...
}
  • Header에서 Timestamp를 추출하기 위한 Header Key를 Configuration으로 받는다.
public class RecordHeaderTimeestmapExtractor implements TimestampExtractor {
  @Override
  public Long extract(ConnectRecord<?> record) {
    Header header = record.headers().lastWithName(this.recordHeaderKey);
    // header key 없는 경우 Exception
    if (Objects.isNull(header)) {
      log.error("Not found header: {}", recordHeaderKey);
      throw new PartitionException("No header matching timestamp.header exists.");
    }

        // Kafka Connect에서 RFC 3339는 org.apache.kafka.connect.data.Timestamp로 판단
    if ("org.apache.kafka.connect.data.Timestamp".equals(header.schema().name())) {
      return ((Date) header.value()).getTime();
    } else {
           // header 스키마가 Timestamp ms인 경우 lonvg value 변환 
      switch (header.schema().type()) {
        case INT32, INT64 -> {
          return ((Number) header.value()).longValue();
        }
        case STRING -> {
            // String 포맷인 경우 RFC 3339 String이라 가정
          return extractTimestampFromString((String) header.value());
        }
        default -> {
          log.error("Unsupported type '{}' for record header timestamp.", header.schema().type().getName());
          throw new PartitionException("Error extracting timestamp from record header: " + recordHeaderKey);
        }
      }
    }
  }

  private Long extractTimestampFromString(String timestampValue) {
    try {
      // RFC3339 format (yyyy-mm-ddThh:mm:ss.sssZ)
      Instant timestamp = Instant.parse(timestampValue);

      return timestamp.toEpochMilli();
    } catch (DateTimeParseException e) {
      log.error("[RecordHeaderTimestampExtractor] Invalid timestamp format: {}", timestampValue);

      throw new PartitionException("Invalid timestamp format: " + timestampValue, e.getCause());
    } catch (Exception e) {
      log.error("[RecordHeaderTimestampExtractor] Error extracting timestamp from string: {}", timestampValue);

      throw new PartitionException("Error extracting timestamp from string: " + timestampValue, e.getCause());
    }
  }
}
  • Record의 Header에서 Configuration으로 받은 Header Key와 일치하는 마지막 헤더를 찾는다.
  • Header의 포맷을이 RFC 3339인 경우 Date로 캐스팅 후 getTime()을 통해 Timestamp(ms)를 반환한다.
  • Timestamp(ms) 자체로 들어오는 경우 단순히 Long 값으로 반환한다.

Custom TimestampExtractor 적용


  • Build한 FatJar를 S3 Sink Connector가 들어가 있는 Plugin Path에 적용
    • e.g) /opt/kafka/plugins/s3-sink-connector
  • Configuration 적용
    • REST API 사용: PUT /connectors/{connector_name}/config
    • Strimzi Cluster Operator 사용: YAML에 적용한 값 그대로 반영
YAML에 적용된 Configuration은 REST API에 적용하는 Configuration과 동일
timestamp.extractor: com.zayson.extractor.RecordHeaderTimestampExtractor # fullpath
timestamp.header: createdate
...

 

  • Custom TimestampExtractor를 적용할 때는 반드시 Full-Path를 적용해야 한다.
Confluent에서 구현해놓은 클래스의 경우 Wallclock, Record, RecordField만 입력해도 Class Fullpath를 만들지만,
이외의 Class는 FullPath를 만들어주지 않는다. (하단 코드 참조)
 switch (extractorClassName) {
     case "Wallclock":
   case "Record":
   case "RecordField":
       // full path 생성
       extractorClassName = "io.confluent.connect.storage.partitioner.TimeBasedPartitioner$" + extractorClassName + "TimestampExtractor";
   default:
       Class<?> klass = Class.forName(extractorClassName);
     if (!TimestampExtractor.class.isAssignableFrom(klass)) {
         throw new ConnectException("Class " + extractorClassName + " does not implement TimestampExtractor");
     } else {
         return (TimestampExtractor)klass.newInstance();
     }
 }

테스트


record = new ProducerRecord<>("oops" ,null, 1722815999999L, null, message,null); // 2024-08-04T23:59:59.999Z
record1 = new ProducerRecord<>("oops", null, 1722816000000L,null,  message1, null); // 2024-08-05T00:00:00.000Z
record2 = new ProducerRecord<>("oops", null, 1722816000001L, null, message2, null); // 2024-08-05T00:00:00.001Z

record.headers().add("createdate", "1999-12-31T23:59:59.999Z".getBytes());
record1.headers().add("createdate", "2000-01-01T00:00:00.000Z".getBytes());
record2.headers().add("createdate", "946688400000".getBytes()); // 2000-01-01T01:00:00:000Z
  • Record Timestamp 범위: 2024-08-04T23:59:59:000 ~ 2024-08-05T00:00:00.000Z
  • Record Header Timestamp 범위: 1999-12-31T23:59:59.999Z ~ 2000-01-01T01:00:00.000Z

S3 저장 결과

aws s3 ls s3://my-bucket --recursive 
  • S3에 적재된 파일을 조회해보면 Prefix가 RecordHeader에 담긴 값 그대로 파티셔닝된 것을 확인할 수 있다.
반응형
저작자표시 비영리 변경금지 (새창열림)

'Kafka > Kafka Connect' 카테고리의 다른 글

Strimzi, Kafka Connect를 이용한 데이터 통합 파이프라인 환경 구성하기  (0) 2024.10.25
    'Kafka/Kafka Connect' 카테고리의 다른 글
    • Strimzi, Kafka Connect를 이용한 데이터 통합 파이프라인 환경 구성하기
    Zayson
    Zayson
    공부한 내용을 정리하는 공간

    티스토리툴바