Strimzi, Kafka Connect를 이용한 데이터 통합 파이프라인 환경 구성하기
서론
현재 회사에서 진행중인 메인 프로젝트는 실시간으로 게임으로부터 들어오는 데이터를 통합하고 이를 가공해 제 3자 서비스에게 가공된 데이터를 제공해주는 파이프라인을 구축하는 것입니다.
팀에서는 대용량 데이터를 실시간으로 스트리밍 처리하기 위해 Kafka를 사용 중입니다.
따라서, 실시간으로 외부 저장소에 적재되는 게임 데이터를 Kafka로 통합하고, 가공된 데이터를 외부 저장소로 적재하는 기술 도입이 필요했습니다.
Kafka Connect는 Kafka의 구성 요소로서 분산 데이터 처리 플랫폼에서 스트리밍 데이터 통합 프레임워크입니다.
다양한 외부 저장소(Source/SInk)로부터 Kafka Connector를 통해 데이터를 가져오고, 적재할 수 있는 장점이 있습니다.
일반적으로 Kafka Connect는 클라우드나 온프레미스 환경에 클러스터를 설치해 사용합니다.
하지만, 이런 온프레미스 환경의 경우 리소스를 수동으로 관리해줘야 하는 큰 단점이 존재합니다.
또한, Connector가 멈춰 재시작을 해주거나, 설정을 변경하는 내용들을 Rest API 호출을 통해 직접 관리해줘야 했습니다.
Strimzi는 쿠버네티스 환경에서 Kafka 관련 리소스들을 CRD로 정의하고 오퍼레이터 패턴을 이용해 리소스들을 자동으로 관리해주는 오픈소스 프로젝트입니다.
쿠버네티스 환경에서 동작하기 때문에 리소스의 자동화, 관리의 편리함 등의 쿠버네티스 장점을 취할 수 있습니다.
또한, 개발자는 기존 CRD에 맞게 Kafka Connect/Connector 설정을 YAML에 정의해 관리할 수 있는 장점이 있습니다.
마지막으로, Strimzi Cluster Operator가 YAML의 Desired State를 확인해 반영하며, REST API를 호출해 수동적인 작업을 자동화 할 수 있습니다.
운영에 있어서의 장점과 쿠버네티스 환경을 사용하고 있다는 점을 고려해 Strimzi를 이용한 Kafka Connect 클러스터를 구성하기로 결정했습니다.
사전 준비 사항
기본적인 Strimzi 구성에 앞서 다음 내용이 기본적으로 준비되어 있다고 가정합니다.
- 쿠버네티스
- Helm 설치
- Kafka 환경 구성
Strimzi Cluster Operator 설치
Strimzi는 쿠버네티스 오퍼레이터 패턴을 사용한 Cluster Operator를 통해 사용자가 배포한 YAML의 상태를 주기적으로 감지하고 Desired State로 반영해줍니다.
따라서, Kafka Connect 관련 리소스들을 YAML로 정의하고 자동으로 관리하기 위해선 Strimzi Cluster Operator를 설치해야합니다.
Strimzi Cluster Operator 관련 정리한 내용은 “Strimzi Cluster Operator 개념 정리”에서 확인해주세요!
Helm을 이용한 Cluster Operator 설치
다양한 Strimzi Cluster Operator를 설치하는 방법이 존재하지만, 가장 간단한 방법인 helm
을 이용해 설치해보도록 하겠습니다.
1. Helm Repository를 추가합니다.
helm repo add strimzi https://strimzi.io/charts/
2. (opt) Helm 차트를 구성할 values.yaml
을 정의합니다.
values.yaml
은 Helm 차트에서 기본 설정값을 정의하는 역할을 담당합니다. 따라서, 기본값을 사용하고자 하는 경우 별도의 values.yaml 파일을 구성하지 않아도 됩니다.
replicas: 3 # 클러스터 오퍼레이터의 Replica 개수
watchNamespaces: [operator, kafka-connect] # 클러스터 오퍼레이터가 감지할 쿠버네티스 네임스페이스
podDisruptionBudget:
enabled: true
minAvailable: 1 # 최소 1의 Cluster Operator는 동작하도록 설정 (HA)
leaderElection:
enable: true # 3개의 클러스터 오퍼레이터 중에서 리더 선출을 할지 결정
image:
imagePullPolicy: Always # 클러스터 오퍼레이터가 배포할 리소스의 이미지 Pull 정책
Cluster Operator를 구성할 수 있는 Value들의 설명 및 목록들은 Strimzi github에 자세하게 정의되어 있습니다!
3. Cluster Operator를 Helm을 이용해 설치합니다.
버전을 명시하지 않는 경우 가장 최신 버전으로 설치됩니다.
helm install strimzi/strimzi-kafka-operator -f <values.yaml 파일> \
--namespace <오퍼레이터를 배포할 네임스페이스> \
--version <오퍼레이터 버전>
이렇게 3가지 단계를 거치면 Kafka Connect 관련 리소스들을 자동으로 관리해주는 Cluster Operator 배포가 완료됩니다.
번외로, Cluster Operator 버전을 업그레이드 하고 싶은 경우 아래 커맨드를 통해 업그레이드가 가능합니다.
helm upgrade <release-name> strimzi/strimzi-kafka-operator \
--namespace <namespace> \
--version <new-version>
Helm 차트를 이용해 Cluster Operator 버전을 업그레이드하는 경우 Strimzi에서 제공하는 CRD(Custom Resource Definition) YAML 파일은 업그레이드 되지 않습니다.
CRD를 업그레이드 해주기 위해서는 Helm Repository 내 정의되어 있는 YAML 파일을 직접
kubectl apply
커맨드를 통해 반영해줘야 합니다.helm pull strimzi/strimzi-kafka-operator --untar # 현재 디렉토리로 strimzi-kafka-operator 차트 가져오기 kubectl apply -f strimzi-kafka-operator/crds # crd 파일 kubernetes 적용
Kafka Connect Manifest 준비
Strimzi를 통해 Kafka Connect/Connector를 배포하기 위해서는 미리 정의되어 있는 CRD의 스펙에 맞게 YAML 파일을 구성해줘야 합니다.
Strimzi Cluster Operator 버전에 맞게 CRD 스펙을 확인하고 이에 맞게 구성할 필요가 있습니다.
아직까지 Strimzi Cluster Operator는 메이저 버전이 업데이트 되지 않은 만큼 자주 스펙의 변경이 이뤄지고 있기 때문에 자신이 필요한 내용을 지원하고 있는지도 확인할 필요가 있습니다.
특히 버전 별로 지원하는 Kafka 버전, 쿠버네티스 버전이 다르기 때문에 자신이 구성하는 환경에서 해당 버전의 Cluster Operator를 사용할 수 있는지도 반드시 확인이 필요합니다.
먼저, Kafka Connect YAML 파일을 구성해보겠습니다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-worker-test
namespace: kafka-connect
annotations:
strimzi.io/use-connector-resources: "true" # 반드시 true로 설정합니다.
spec:
authentication:
type: scram-sha-512
username: test-user # Broker 접속을 위한 ACL Username
passwordSecret: # ACL Password는 secret 파일을 통해 관리
secretName: kafka-user-secret
password: test-user-password # secret key name 입력
tls:
trustedCertificates: [] # SASL_SSL 방식 사용 시 빈 값으로 설정
replicas: 2
bootstrapServers: kafka:9096,kafka:9097 # Kafka Broker 주소
image: <container image path> # Kafka Connect Image 경로
config: # Kafka Connect Config 정의 (Apahce Kafka 공식 문서 참고할 것)
# Strimzi에서 제공하는 ConfigProvider 적용 (환경변수 사용을 위해)
config.providers: env
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
# Kafka Connect Configuration 시작
group.id: test-connect-consumer-group # 컨슈머 그룹
offset.storage.topic: test-connect-offsets
config.storage.topic: test-connect-configs
status.storage.topic: test-connect-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
resources:
requests:
cpu: 2
memory: 1Gi
limits:
cpu: 2
memory: 1Gi
jvmOptions:
"-XX":
InitialRAMPercentage: 60.0
MaxRAMPercentage: 80.0
HeapDumpOnOutOfMemoryError: true
HeapDumpPath: /mnt/dump
externalConfiguration:
env:
- name: ENV_ACCESS_KEY
valueFrom:
secretKeyRef:
name: env-credential-secret
key: env-access-key
metricsConfig: # Kafka connect 메트릭 설정
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: connect-metrics # configmap에 JMXExporter Configuration 정보가 저장되어 있음
key: metrics-config.yml
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
template: # optional Pod, Container 등의 템플릿 지정 가능
serviceAccount:
metadata:
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::1234567890:role/test-role # S3 Role 지정 (S3 Sink Connector 사용 중)
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: name
operator: In
values:
- test-worker
metadata.annotations.strimzi.io/use-connector-resources
: 해당 값을true
로 지정해야 KafkaConnect가 커넥터를 생성,삭제 및 reconfiguration이 가능합니다.image
: 여러 Kafka Connector Plugin을 갖는 Custom Image를 만들어 사용할 수 있습니다.- 현재 Strimzi를 이용한 운영 환경에서는 여러 종류의 Connector가 필요하기에 Custom Docker Image를 만들고 ECR에 업로드해서 사용하고 있습니다.
- 별도의 image를 사용하지 않는다면
build
스펙을 정의해서 저장소에서 Connector Plugin을 가져올 수 있습니다.
jvmOptions
: JVM 메모리 옵션을 지정할 수 있습니다.-XX
,-Xmx
,-Xms
옵션을 지정할 수 있으며,-Xgclog
와 같은 옵션은 사용할 수 없습니다.externalConfiguration
: ConfigMap이나, Secret에 정의된 데이터를 Pod으로 가져올 수 있습니다.template
: pod이나 container와 같은 쿠버네티스 템플릿을 지정할 수 있습니다.
위에 정의한 내용 말고도 수많은 스펙이 Kafka Connect CRD에 정의되어 있는데요.
Strimzi 공식 문서를 통해서 확인할 수 있습니다.
Kafka Connector Manifest 준비
다음으로는 Kafka Connector 관련 YAML을 정의해보겠습니다.
Kafka Connector는 Connector Vendor 별로 지원하는 Configuration이 다르고, Sink/Source에 따라서 제공하는 Configuration도 다릅니다.
따라서, 사용할 Connector에 맞게 문서를 보고 Configuration을 설정해야 합니다.
현재 팀에서는 S3 Sink Connector를 주로 사용하고 있기 때문에 Confluent S3 Sink Connector를 기준으로 YAML 파일을 구성해보겠습니다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: test-s3-sink-connector
namespace: kafka-connect
labels:
strimzi.io/cluster: kafka-connect-worker-test # 반드시 Connector를 실행시킬 KafkaConnect name을 지정해줍니다.
spec:
autoRestart: # Connector state가 Failed 되는 경우 자동으로 재시작
enabled: true
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 3 # Task 최대 개수
state: running
config:
# Kafka Consumer Configuration
consumer.override.isolation.level: read_committed
consumer.override.group.id: test-s3-sink-connector
consumer.override.max.poll.records: 1000
consumer.override.partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# S3 Sink Connector Configuration
flush.size: 2000
rotate.schedule.interval.ms: 60000
s3.bucket.name: test-bucket
s3.region: ap-northeast-2
s3.part.size: 5242880
format.class: io.confluent.connect.s3.format.json.JsonFormat
storage.class: io.confluent.connect.s3.storage.S3Storage
topics.dir: folder-1
file.delim: "-"
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
partition.duration.ms: 60000
path.format: YYYY/MM/dd/HH
locale: ko_KR
timezone: UTC
timestamp.extractor: Record
# Kafka Sink Connector Configuration
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
topics: test-topic
errors.tolerance: all
errors.deadletterqueue.topic.name: dlq-test-topic
errors.deadletterqueue.topic.replication.factor: 3
errors.deadletterqueue.context.headers.enable: true
errors.retry.delay.max.ms: 60000
errors.retry.timeout: 300000
metadata.labels.strimzi.io/cluster
: KafkaConnector를 동작시킬 KafkaConnect 클러스터 이름을 지정합니다. KafkaConnector를 구동하고자 하는 KafkaConnect 클러스터는 반드시 같은 네임스페이스에 위치해야 합니다.autoRestart.enabled
: true로 설정하면 Task가 Failed상태가 될 때 자동으로 재시작됩니다. 재시작 주 기는n * n + n
주기(n
은 이전 재시작된 횟수)로 증가하며, 최대 60분으로 제한됩니다.maxRestarts
설정을 통해 최대 재시작 횟수를 제한할 수 있으며, 지정하지 않은 경우 무한히 재시작을 시도합니다.tasksMax
: Kafka Connector의 Task 개수를 지정합니다.config
: 이 부분에는 Kafka Connect Source/Sink Connector의 Configuration 및 사용하는 Connector에서 제공하는 설정 정보를 입력할 수 있습니다.
Kafka Connector 스펙 역시 Strimzi 공식 문서에서 확인할 수 있습니다. 또한, S3 Sink Connector는 Confluent 문서에서 확인할 수 있습니다.
마무리
이상으로 Strimzi Cluster Operator를 구축하고, Kafka Connect/Connector를 CRD를 구성해보았습니다.
Kafka Connect는 Kafka를 사용한 데이터 스트리밍 파이프라인에서 데이터 통합에 있어서 중요한 한 축을 담당합니다.
데이터 스트리밍 파이프라인을 구성할 때, Strimzi Cluster Operator와 함께 Kafka Connect를 구성해보세요.
쿠버네티스의 장점 및 운영의 장점과 함께 Kafka Connect를 편리하게 사용할 수 있으므로 함께 구성해보면 좋을 것 같습니다.
래퍼런스
Configuring Strimzi: https://strimzi.io/docs/operators/latest/configuring.html#con-schema-reference-intro-reference
Deploying and Upgrading Strimzi : https://strimzi.io/docs/operators/latest/deploying
Strimzi Kafka Operator GitHub: https://github.com/strimzi/strimzi-kafka-operator
Apche Kafka Connect: https://kafka.apache.org/documentation/#connect
Confluent S3 Sink Connector: https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html