Zayson
A to Zayson!
Zayson
전체 방문자
오늘
어제
  • 분류 전체보기 (132)
    • Computer Science (20)
      • Network (4)
      • DB (12)
      • OS (4)
    • Algorithm (32)
      • 완전탐색(Brute-Force) (3)
      • 그리디(Greedy) (6)
      • 투포인터(Two-Pointer) (1)
      • 그래프(Graph) (5)
      • BFS & DFS (9)
      • 구현, 시뮬레이션(Implementation) (5)
      • 다이나믹 프로그래밍(DP) (3)
    • Backend (51)
      • Spring Boot (19)
      • JPA (16)
      • Kafka (2)
      • Java (13)
      • Kotlin (1)
    • DevOps (1)
      • Jenkins (5)
      • Oracle Cloud Infrastructure (1)
      • Kubernetes & Docker (1)
    • Trouble Shooting (3)
      • JPA (1)
      • Spring Boot (2)
    • 회고 (5)
      • 엔빵 프로젝트 포스트 로드맵 (1)
      • 2022년 (4)
    • Kafka (7)
      • Kafka (5)
      • Kafka Connect (2)
    • 기술 서적 (6)
      • 데이터 중심 애플리케이션 설계 (3)
      • 개발자가 반드시 정복해야할 객체 지향과 디자인 패턴 (2)
      • 가상 면접 사례로 배우는 대규모 시스템 설계 기초 (1)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

인기 글

태그

  • 프로그래머스
  • 그리디
  • 완전탐색
  • 엔빵프로젝트
  • SpringBoot
  • JPA
  • Backend
  • 구현
  • 백준
  • spring boot
  • 관계형 데이터베이스 실전 입문
  • Kafka Connect
  • BFS
  • Computer science
  • dfs
  • 라이브스터디
  • kafka
  • 나 혼자 스프링부트!
  • CS
  • Java

최근 글

티스토리

hELLO · Designed By 정상우.
Zayson

A to Zayson!

Kafka Issue: InvalidRecordException (related Timestamp)
Kafka/Kafka

Kafka Issue: InvalidRecordException (related Timestamp)

2024. 6. 1. 12:50

에러 발생 환경


  • AWS MSK (3.6.0), Tiered Storage 활성화

에러 로그


"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception
│         from producer send callback\\n\\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:340)\\n\\tat
│         org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:133)\\n\\tat
│         org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:411)\\n\\tat
│         org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)\\n\\tat
│         org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\\n\\tat
│         org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\\n\\tat
│         org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\\n\\tat
│         org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\\n\\tat
│         java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\\n\\tat
│         java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\\n\\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\\n\\tat
│         java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\\n\\tat
│         java.base/java.lang.Thread.run(Thread.java:840)\\nCaused by: org.apache.kafka.common.InvalidRecordException:
│         Timestamp 1716397211672 of message with offset 0 is out of range. The timestamp
│         should be within [1716883864363, 1717056664363]\\n"
  • Caused by: org.apache.kafka.common.InvalidRecordException: Timestamp 1716397211672 of message with offset 0 is out of range. The timestamp should be within [1716883864363, 1717056664363]

상황


  • Kafka To Kafka로의 데이터 전송을 위해 Kafka MirrorMaker2를 사용하고 있다.

Geo-Replication이 목적이 아니기 때문에 Source Kafka ClusterConfiguration을 Target Kafka Cluster 에 sync하고 있지 않은 상태

  • Source 토픽에 저장된 데이터를 Target으로 미러링할 때, 레코드 타임스탬프가 유효하지 않아 MM2 Source Connector Task가 Failed되는 문제가 발생했다.

원인


  • 현재 사용중인 MSK는 3.6.0 버전이며, Kafka 3.6.0 코드에서 에러가 발생한 부분을 확인해보면 하기 설정 값이 에러와 연관이 있는 것을 확인할 수 있다.
// LogValidator.java
if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) {
    if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) {
        return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
                "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
            + " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs)
            + ", " + (now + timestampAfterMaxMs) + "]")));
    }
...
  • (Topic Level) message.timestamp.before.max.ms
  • (Topic Level) message.timestamp.after.max.ms

상기 설정은 message.timestamp.type이 CreateTime일 때만 사용된다.

private static boolean recordHasInvalidTimestamp(
    Record record, long now, long timestampBeforeMaxMs, long timestampAfterMaxMs
) {
    final long timestampDiff = now - record.timestamp();
    return timestampDiff > timestampBeforeMaxMs || -1 * timestampDiff > timestampAfterMaxMs;
}
  • 즉, 브로커 타임스탬프(현재)를 기준으로 설정한 값보다 이전/이후 사이에 생성된 레코드는 브로커에서 받을 수 있는 정상적인 레코드로 판단하는 설정이라고 보면 된다.
  • Kafka 3.6.0은 기존에 사용 중이던 message.timestamp.difference.max.ms 값을 deprecated하고 message.tiemstamp.before/after.max.ms로 세분화 한 설정을 반영했다. (KIP-937)
  • Kafka에서 message.tiemstamp.before/after.max.ms 값은 기본적으로 Long.MAX_VALUE로 설정되어 있으며, AWS MSK에서도 동일하게 설정되어 있기 때문에 대부분의 모든 레코드는 정상적인 타임스탬프 범위에 있는 레코드이다.
  • 하지만, MSK 문서를 확인하면 Tiered Storage를 제공하는 2.8.2에서는 Tiered Storage가 활성화 되면, message.timestamp.difference.max.ms의 기본값이 86400000 (1일)로 설정되는 것을 확인할 수 있다.
  • AWS MSK는 2.8.2, 3.6.0에서 Tieres Storage를 제공한다.
  • MSK 3.6.0에서 Tiered Storage가 활성화 될 때 2.8.0과 마찬가지로 message.timestamp.diffrence.max.ms값이 변경 되는데 (문서 누락인 것으로 보임), 마침 Kafka 3.6.0에서 해당 설정이 message.timestamp.before/after.max.ms로 변경되면서 Tiered Storage 활성화로 인해 변경된 값이 반영되면서 Sync가 이상하게 맞아 발생한 문제로 예상된다.

해결


  • MSK Broker Configuration을 변경한다.
    • log.message.timstamp.before.max.ms=9223372036854775807
    • log.message.timestamp.after.max.ms=9223372036854775807

Reference.


  • KIP-937: Improve Message Timestamp Validation
  • https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html
반응형
저작자표시 비영리 변경금지 (새창열림)

'Kafka > Kafka' 카테고리의 다른 글

Kafka: Exactly-Once Semantic (w/ Transaction_)  (0) 2025.01.27
Kafka Connect: Incremental Cooperative Rebalancing  (0) 2024.12.09
Kafka: Static Membership  (0) 2024.11.03
Kafka: Kafka Broker 내부 구조  (0) 2024.04.28
    'Kafka/Kafka' 카테고리의 다른 글
    • Kafka: Exactly-Once Semantic (w/ Transaction_)
    • Kafka Connect: Incremental Cooperative Rebalancing
    • Kafka: Static Membership
    • Kafka: Kafka Broker 내부 구조
    Zayson
    Zayson
    공부한 내용을 정리하는 공간

    티스토리툴바