Kafka/Kafka

Kafka Issue: InvalidRecordException (related Timestamp)

Zayson 2024. 6. 1. 12:50

에러 발생 환경


  • AWS MSK (3.6.0), Tiered Storage 활성화

에러 로그


"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception
│         from producer send callback\\n\\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:340)\\n\\tat
│         org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:133)\\n\\tat
│         org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:411)\\n\\tat
│         org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)\\n\\tat
│         org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\\n\\tat
│         org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\\n\\tat
│         org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\\n\\tat
│         org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\\n\\tat
│         java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\\n\\tat
│         java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\\n\\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\\n\\tat
│         java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\\n\\tat
│         java.base/java.lang.Thread.run(Thread.java:840)\\nCaused by: org.apache.kafka.common.InvalidRecordException:
│         Timestamp 1716397211672 of message with offset 0 is out of range. The timestamp
│         should be within [1716883864363, 1717056664363]\\n"
  • Caused by: org.apache.kafka.common.InvalidRecordException: Timestamp 1716397211672 of message with offset 0 is out of range. The timestamp should be within [1716883864363, 1717056664363]

상황


  • Kafka To Kafka로의 데이터 전송을 위해 Kafka MirrorMaker2를 사용하고 있다.

Geo-Replication이 목적이 아니기 때문에 Source Kafka ClusterConfiguration을 Target Kafka Cluster 에 sync하고 있지 않은 상태

  • Source 토픽에 저장된 데이터를 Target으로 미러링할 때, 레코드 타임스탬프가 유효하지 않아 MM2 Source Connector Task가 Failed되는 문제가 발생했다.

원인


  • 현재 사용중인 MSK는 3.6.0 버전이며, Kafka 3.6.0 코드에서 에러가 발생한 부분을 확인해보면 하기 설정 값이 에러와 연관이 있는 것을 확인할 수 있다.
// LogValidator.java
if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) {
    if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) {
        return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
                "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
            + " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs)
            + ", " + (now + timestampAfterMaxMs) + "]")));
    }
...
  • (Topic Level) message.timestamp.before.max.ms
  • (Topic Level) message.timestamp.after.max.ms

상기 설정은 message.timestamp.type이 CreateTime일 때만 사용된다.

private static boolean recordHasInvalidTimestamp(
    Record record, long now, long timestampBeforeMaxMs, long timestampAfterMaxMs
) {
    final long timestampDiff = now - record.timestamp();
    return timestampDiff > timestampBeforeMaxMs || -1 * timestampDiff > timestampAfterMaxMs;
}
  • 즉, 브로커 타임스탬프(현재)를 기준으로 설정한 값보다 이전/이후 사이에 생성된 레코드는 브로커에서 받을 수 있는 정상적인 레코드로 판단하는 설정이라고 보면 된다.
  • Kafka 3.6.0은 기존에 사용 중이던 message.timestamp.difference.max.ms 값을 deprecated하고 message.tiemstamp.before/after.max.ms로 세분화 한 설정을 반영했다. (KIP-937)
  • Kafka에서 message.tiemstamp.before/after.max.ms 값은 기본적으로 Long.MAX_VALUE로 설정되어 있으며, AWS MSK에서도 동일하게 설정되어 있기 때문에 대부분의 모든 레코드는 정상적인 타임스탬프 범위에 있는 레코드이다.
  • 하지만, MSK 문서를 확인하면 Tiered Storage를 제공하는 2.8.2에서는 Tiered Storage가 활성화 되면, message.timestamp.difference.max.ms의 기본값이 86400000 (1일)로 설정되는 것을 확인할 수 있다.
  • AWS MSK는 2.8.2, 3.6.0에서 Tieres Storage를 제공한다.
  • MSK 3.6.0에서 Tiered Storage가 활성화 될 때 2.8.0과 마찬가지로 message.timestamp.diffrence.max.ms값이 변경 되는데 (문서 누락인 것으로 보임), 마침 Kafka 3.6.0에서 해당 설정이 message.timestamp.before/after.max.ms로 변경되면서 Tiered Storage 활성화로 인해 변경된 값이 반영되면서 Sync가 이상하게 맞아 발생한 문제로 예상된다.

해결


  • MSK Broker Configuration을 변경한다.
    • log.message.timstamp.before.max.ms=9223372036854775807
    • log.message.timestamp.after.max.ms=9223372036854775807

Reference.


반응형