에러 발생 환경
- AWS MSK (3.6.0), Tiered Storage 활성화
에러 로그
"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception
│ from producer send callback\\n\\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:340)\\n\\tat
│ org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:133)\\n\\tat
│ org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:411)\\n\\tat
│ org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)\\n\\tat
│ org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\\n\\tat
│ org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\\n\\tat
│ org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\\n\\tat
│ org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\\n\\tat
│ java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\\n\\tat
│ java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\\n\\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\\n\\tat
│ java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\\n\\tat
│ java.base/java.lang.Thread.run(Thread.java:840)\\nCaused by: org.apache.kafka.common.InvalidRecordException:
│ Timestamp 1716397211672 of message with offset 0 is out of range. The timestamp
│ should be within [1716883864363, 1717056664363]\\n"
- Caused by: org.apache.kafka.common.InvalidRecordException: Timestamp 1716397211672 of message with offset 0 is out of range. The timestamp should be within [1716883864363, 1717056664363]
상황
- Kafka To Kafka로의 데이터 전송을 위해 Kafka MirrorMaker2를 사용하고 있다.
Geo-Replication이 목적이 아니기 때문에 Source Kafka ClusterConfiguration을 Target Kafka Cluster 에 sync하고 있지 않은 상태
- Source 토픽에 저장된 데이터를 Target으로 미러링할 때, 레코드 타임스탬프가 유효하지 않아 MM2 Source Connector Task가 Failed되는 문제가 발생했다.
원인
- 현재 사용중인 MSK는 3.6.0 버전이며, Kafka 3.6.0 코드에서 에러가 발생한 부분을 확인해보면 하기 설정 값이 에러와 연관이 있는 것을 확인할 수 있다.
// LogValidator.java
if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) {
if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) {
return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
"Timestamp " + record.timestamp() + " of message with offset " + record.offset()
+ " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs)
+ ", " + (now + timestampAfterMaxMs) + "]")));
}
...
- (Topic Level) message.timestamp.before.max.ms
- (Topic Level) message.timestamp.after.max.ms
상기 설정은 message.timestamp.type이 CreateTime일 때만 사용된다.
private static boolean recordHasInvalidTimestamp(
Record record, long now, long timestampBeforeMaxMs, long timestampAfterMaxMs
) {
final long timestampDiff = now - record.timestamp();
return timestampDiff > timestampBeforeMaxMs || -1 * timestampDiff > timestampAfterMaxMs;
}
- 즉, 브로커 타임스탬프(현재)를 기준으로 설정한 값보다 이전/이후 사이에 생성된 레코드는 브로커에서 받을 수 있는 정상적인 레코드로 판단하는 설정이라고 보면 된다.
- Kafka 3.6.0은 기존에 사용 중이던
message.timestamp.difference.max.ms
값을 deprecated하고message.tiemstamp.before/after.max.ms
로 세분화 한 설정을 반영했다. (KIP-937) - Kafka에서
message.tiemstamp.before/after.max.ms
값은 기본적으로Long.MAX_VALUE
로 설정되어 있으며, AWS MSK에서도 동일하게 설정되어 있기 때문에 대부분의 모든 레코드는 정상적인 타임스탬프 범위에 있는 레코드이다. - 하지만, MSK 문서를 확인하면 Tiered Storage를 제공하는 2.8.2에서는 Tiered Storage가 활성화 되면,
message.timestamp.difference.max.ms
의 기본값이 86400000 (1일)로 설정되는 것을 확인할 수 있다. - AWS MSK는 2.8.2, 3.6.0에서 Tieres Storage를 제공한다.
- MSK 3.6.0에서 Tiered Storage가 활성화 될 때 2.8.0과 마찬가지로
message.timestamp.diffrence.max.ms
값이 변경 되는데 (문서 누락인 것으로 보임), 마침 Kafka 3.6.0에서 해당 설정이message.timestamp.before/after.max.ms
로 변경되면서 Tiered Storage 활성화로 인해 변경된 값이 반영되면서 Sync가 이상하게 맞아 발생한 문제로 예상된다.
해결
- MSK Broker Configuration을 변경한다.
log.message.timstamp.before.max.ms=9223372036854775807
log.message.timestamp.after.max.ms=9223372036854775807
Reference.
반응형
'Kafka > Kafka' 카테고리의 다른 글
Kafka Connect: Incremental Cooperative Rebalancing (0) | 2024.12.09 |
---|---|
Kafka: Static Membership (0) | 2024.11.03 |
Kafka: Kafka Broker 내부 구조 (0) | 2024.04.28 |