서론
Kafka Connect Worker는 컨테이너 기반(e.g Kubernetes Pod)이나 물리적/가상 머신(e.g EC2)에서 일반적으로 사용된다.
이러한 환경에서는 Rolling Restart, Scale-Up/Down, Scale-In/Out 등과 같이 Worker Group의 상태 변경으로 인해 리밸런싱이 자주 발생할 수 있다.
Kafka 2.3.0 이전에는 리밸런싱 시 모든 Task와 Connector가 모두 Revoke된 후 다시 Assign되는 방식(Eager Rebalance)이 적용됐다.
이는 리밸런싱 과정에서 전체 작업이 중단되는 “Stop-The-World” 현상을 야기했다.
이를 해결하기 위해 **KIP-415: Incremental Cooperative Rebalancing이 제안되었고, Kafka 2.3.0부터 해당 기능이 도입**됐다.
Kafka 2.3.0 전후의 리밸런싱 프로토콜에 대해서 자세하게 알아보자.
Eager Rebalancing
Kafka 2.3.0 이전의 리밸런싱에는 Eager Protocol
이 사용됐다.
Eager Protocol은 Worker Group의 상태에 변화가 발생하면 Worker에 할당된 모든 Task, Connector를 중단하고 다시 할당하는 리밸런싱 방식이다.
이로 인해 리밸런싱 과정에서 Stop-The-World
현상이 발생한다.
Stop-The-World 현상은 시스템에서 모든 작업이 일시적으로 멈추는 상태를 의미한다.
이로 인해 사용자는 서비스가 중단되는 것처럼 느낄 수 있다.
Eager Protocol
의 리밸런싱 과정과 어느 부분에서 Stop-The-World 현상이 발생하는지 확인해보자.
Worker 2개에 Task/Connector가 각 3개씩 할당된 상황에서 새로운 Worker가 추가되어 리밸런싱이 발생했다고 가정하자.
리밸런싱이 발생하면 Worker들은 자신이 소유하던 Task/Connector를 JoinGroupRequest를 보내기 전에 모두 Revoke한다.
이 때, 모든 Worker에는 어떠한 Task도 Assigned된 상태가 아니므로 데이터 처리가 중단되며 Stop-The-World
현상이 발생한다.
이 후, 리밸런싱에 참여하기 위해 JoinGroupRequest
를 GroupCoordinator에 전송한다.
JoinGroupRequest
를 받은 GroupCoordinator는 JoinGroupResponse
를 각 Worker들에게 전달하고, Worker 0번을 그룹 리더로 선정한다.
리더는 Revoked된 Task/Connector를 어떤 Worker에 할당할지 계산한다. 그리고 계산 정보와 함께 SyncGroupRequest
를 GroupCoordinator에게 보낸다.
리더 이외의 Worker (Member)들도 동일하게 SyncGroupRequest
를 전송한다.
현재 상태에서도 아직 어떠한 Task도 Worker에 할당되지 않았기 때문에 Stop-The-World
현상은 지속된다.
GroupCoordiantor는 리더가 계산한 Task/Connector 할당 정보를 각 Worker에 전달한다.
SyncGroupResponse
를 받은 각각의 Worker 전달받은 정보를 기반으로 Task/Connector를 Assigned하면 리밸런싱이 완료되고 실제로 데이터 처리가 재개된다.
이처럼 잠깐의 리밸런싱 상황에서 발생하는 Stop-The-World
현상은 리밸런싱이 많아질수록 영향도가 높아진다.
이러한 문제를 해결하기 위해서 Incremental Cooperative Rebalancing
이 제안되었다.
Incremental Cooperative Rebalancing
Incremental Cooperative Rebalancing
은 KIP-415에서 제안되었고, Kafka 2.3.0에서 정식으로 도입됐다.
Incremental Cooperative Rebalancing
의 개념은 다음과 같다.
Incremental (증분)
- 단 한번의 리밸런싱을 통해 최종적인 균형 상태를 갖는 것이 아닌 적은 수의 연속적인 리밸런싱을 통해 최종적으로 균형적인 리소스(Task/Connector)가 할당된 상태를 만든다.
Cooperative (협력)
- 각 프로세스가 재분배가 필요한 리소스를 자발적으로 해제한다.
즉, 각 Worker들은 균형적으로 Task/Connector를 할당받기 위해서 자신이 Revoke할 Task/Connector를 선택한다.
이는 Worker가 자신이 소유한 Task/Connector 중 필요없는 일부만 해제함을 의미하며 Stop-The-World
현상이 발생하지 않는 것을 의미한다.
Incremental Cooperative Rebalancing
이 전체적으로 어떤 절차로 이뤄지는지 요약하면 다음과 같다.
- Worker는 Join Group을 준비할 때 이전과 달리 어떤 Task/Connector도 Revoke하지 않는다.
- Worker가 GroupCoordinator에게 전달하는
JoinGroupRequest
에는 메타데이터가 포함되는데, 해당 메타데이터에는 현재 자신이 소유한 Task/Connector 정보(Current Assignment)이 포함되어 있다. - 리더로 선정된 Worker는 Revoke할 Task/Connector와 Assign할 Task/Connector 정보 (New Assignment)를 계산한다.
- Worker가 새로운 Assignment 정보를 받았을 때, 해당 정보에 Revoke할 Task/Connector가 존재하는 경우 바로 해당 Task/Connector를 해제하고, 해제한 이후에 소유중인 Task/Connector 정보를 기반으로 다시
JoinGroupRequest
를 전송한다. - 일반적으로 4번 절차까지 완료되면, 모든 Worker들은 Revoke할 Task/Connector가 없는 상태이다. 만약 아직 모든 Worker가 Revoke할 Task/Connector가 남아있는 경우 모든 Worker가 Revoke할 Task/Connector가 없을 때까지 위 절차를 반복한다.
- 모든 Worker가 Revoke할 Task/Connector가 없는 Assignment 정보를 받으면, 각각의 Worker들은 해당 정보를 기반으로 Task/Connector를 시작한다. 그리고,
SyncGroupResponse
에 담긴ScheduledRebalanceDelay
시간 동안 그룹에 재참여하는 시간을 연기한다.
Kafka Connect의 Incremental Cooperative Rebalancing은 Incremental Rebalancing: Support and Policies 문서의 Design 2를 기반으로 한다.
자세한 내용을 확인해보고 싶다면 해당 문서를 참고해보자.
단순히 절차만 봤을 땐 Incremental Cooperative Rebalancing
이 정확히 어떻게 동작하는지 이해하기 어렵다.
KIP-415에 정의된 사례를 분석하면서 어떤 방식으로 리밸런싱이 동작하는지 확인해보자.
사례 분석 1. 그룹에 최초로 Worker가 참여하는 경우
첫 번째 사례는 Worker Group에 최초로 Worker가 참여하는 경우이다.
초기 설정은 AC0, AT1, AT2, BC0, BT1이 있다고 가정하자.
AC는 A Connector, AT1은 A Connector의 Task 1번을 의미한다.
새로운 Worker-1이 Worker Group 참여하면 리밸런싱이 발생한다.
이 때, Worker-1은 현재 할당 정보(Current Assignment)를 담아 JoinGroupRequest
를 전송한다.
GroupCoordinator는 Worker-1번을 리더로 결정한다.
현재 Worker Group에는 Worker-1 혼자만 참여하기 때문에 모든 Task/Connector를 자신에게 할당하도록 계산된다.
Worker-1은 계산된 정보(New Assignment)를 담아 GroupCoordiantor에게 SyncGroupRequest
를 전송한다.
SyncGroupResponse
에는 리더가 계산한 Assignment 정보가 담겨있다.
SyncGroupResponse
를 받은 Worker-1은 모든 Task/Connector를 Assigned한다.
첫 번째 사례는 최초로 Worker가 참여하는 사례라 간단하다. 이어서 두 번째 사례를 분석해보자.
사례 분석 2. 기존 그룹에 새로운 Worker가 참여하는 경우
두 번째 사례를 보자.
첫 번째 사례 결과에 이어서 현재 Worker-1에는 모든 Task/Conenctor (AC-0, AT-1, AT-2, BC-0, BT-1)가 할당된 상태에서 Worker Group에 Worker-2, Worker-3이 참여하는 사례이다.
Worker-2, 3은 Worker Group에 참여하기 위해 리밸런싱(Generation=1
)이 발생한다.
Worker-1 또한 리밸런싱에 참여하기 위해 현재 할당된 정보를 메타데이터에 담고 JoinGroupRequest
를 전송한다.
GroupCoordinator는 Worker-1을 리더로 선택한다.
리더가 된 Worker-1은 Task/Connector를 어떻게 할당할지 계산한다.
새로운 Worker-2, Worker-3에게 Task/Connector를 할당해줘야 한다. 따라서, Worker-1은 AT-2, BC-0, BT-1을 Revoke할 대상으로 판단하고 해당 정보를 기반으로 SyncGroupRequest
를 전송한다.
첫 번째 리밸런싱(Generation=1
)이 완료되고, Worker-1은 Task/Connector를 Revoke를 시작한다.
Worker-1은 AT-2, BC-0, BT-1를 Revoke한다.
Revoke된 Task/Connector는 다시 Worker들에게 할당해줘야하기 때문에 새로운 리밸런싱(Geneartion=2
)을 시작한다.
Worker-1은 자신이 소유 중인 Task/Connector 정보를 메타데이터에 담아 JoinGroupRequest
를 요청하고, 나머지 Worker는 첫번째 리밸런싱과 동일하게 빈 상태로 요청한다.
GroupCoordinator는 Worker-1을 리더로 선정한다.
리더는 현재 할당되지 않은 AT-2, BC-0, BT-1을 각 Worker에 할당하기 위해 계산한다.
Worker-2는 AT-2, BC-0, Worker-3은 BT-1을 할당받도록 계산되고, 해당 정보를 SyncGroupRequest
Assignment에 담아 GroupCoordinator에게 보낸다.
마지막으로 모든 Worker는 SyncGroupResponse를 받으면 리밸런싱(Generation=2)가 마무리된다. 각 Worker는 SyncGroupResponse
Assignment 정보로 Task/Connector를 할당한다.
사례 분석 3. Worker가 그룹을 떠나는 경우
3번째 사례는 팔로워 Worker가 그룹을 영구적으로 떠나는 사례이다.
사례 2번의 결과인 상황에서 팔로워 Worker인 Worker-2가 떠나는 상황으로 확인해보자.
중요하지 않은 과정에 대한 그림은 생략했다.
Worker-2가 LeaveGroup
을 요청하면 소유하고 있던 Task/Connector들은 Revoke된다.
Worker Group의 변경이 발생했기 때문에 첫 번째 리밸런싱이 트리거된다.
Worker-1, 3은 자신이 소유 중인 Task/Connector 정보를 메타데이터에 담아 JoinGroupRequest
를 전송한다.
JoinGroupResponse
를 통해서 Worker-1이 리더로 선정된다. 리더는 Assignment 정보를 계산한다.
리더가 계산한 정보에는 delay
값이 있는데, 이 delay
는 Worker가 그룹을 떠났을 때, 각 Worker가 Task/Connector를 일시적으로 불균형하게 할당받는 것을 허용하는 최대 시간값을 의미한다.
delay
시간 내에 다시 Worker가 그룹으로 돌아오는 경우 이전과 동일하거나 비슷한 양의 작업을 받을 수 있다.
따라서, Worker가 그룹을 떠난 상황에서 리더는 이 delay
를 활성화시켜 해당 시간 동안 Revoked된 Task/Connector를 재분배해주는 것을 유예시킨다.
Kafka Connect 설정에서 이 delay 값은 scheduled.rebalance.max.delay.ms(default 300000) 값으로 사용된다.
리더는 계산한 정보 (delay 및 Assignment 정보)를 GroupCoordinator에게 전달한다.
SyncGroup
이 완료된 이후 리더는 설정된 delay d
동안 Task/Connector를 재분배하지 않는다.
이 때, 각 Worker에서 소유하고 있는 Task에서는 실제 데이터 처리가 진행되고 있다.
delay d
가 모두 경과한 이후에 리더는 Task/Connector 재분배를 시작하며 두 번째 리밸런싱(Generation 2)이 시작된다.
Worker-1, 3은 자신이 소유한 정보를 기반으로 JoinGroupRequest
를 전송한다.
JoinGroupResponse
를 통해서 리더가 된 Worker-1은 Revoked된 Task/Connector를 포함해서 Assignment 정보를 계산하고 SyncGroupRequest
를 전송한다.
SyncGroup
이 완료되어 두 번째 리밸런싱이 완료되면 모든 Task/Connector가 각 Worker에 할당된다.
사례 분석 4. Worker가 잠깐 그룹을 떠났다가 다시 돌아오는 경우
Rolling Update와 같은 상황에서는 업데이트를 위해서 일시적으로 Worker가 그룹을 떠나는 경우가 있을 것이다. 이럴 때마다 Task/Connector를 다른 Worker에 재분배해주는 것은 비효율적이다.
사례 4번은 이와 같이 Worker가 일시적으로 그룹을 떠났다 돌아오는 경우에 대한 내용이다.
첫 번째 리밸런싱 과정은 사례 3번과 완전히 동일하다.
Worker 1, 3은 자신의 소유 정보를 기반으로 리밸런싱을 진행하고, 리밸런싱이 완료된 이후 계산된 delay d
값을 활성화시킨다.
delay d
가 지나기 전에 Worker-2가 Rolling Update가 완료되어 Worker 그룹에 들어가길 요청했다.
Worker-2는 현재 소유한 Task/Connector가 없으므로 빈 Assignment와 함께 JoinGroupRequest
를 보내며 두 번째 리밸런싱 (Generation 2)가 시작된다.
다시 리더가 된 Worker-1은 다시 그룹으로 돌아온 Worker-2를 포함해 할당할 Task/Connector를 계산한다.
아직, Worker-2는 그룹에 완벽하게 참여한 것이 아니기 때문에 delay d
가 활성화된 이후 남은 시간인 delay d#
값이 설정되며, Revoke된 Task/Connector는 아직 다른 Worker들에게 할당되도록 계산되지 않는다.
SyncGroup
까지 모두 완료되면 두 번째 리밸런싱(Generation 2)이 완료되고 Worker-2가 Worker 그룹에 정상적으로 가입된다.
아직 처음에 Revoke된 Task/Connector가 남아있기 때문에 delay d#
이후에 세 번째 리밸런싱(Generation 3)이 시작된다.
리더가 된 Worker-1은 Revoked 된 Task/Connector를 포함해 할당할 정보를 계산하며 별도의 delay가 필요없기 때문에 0으로 설정된다.
SyncGroup
이후 세 번째 리밸런싱 (Generation 3)이 완료되고, 모든 Task/Connector가 각 Worker에 할당된다.
사례 분석 5. Leader Worker가 그룹을 떠나는 경우
Worker 그룹에서 리더인 Worker가 완전히 LeaveGroup하는 경우가 있을 수 있다.
리더는 각 Worker에 할당할 정보들을 계산하고 관리하는 주체이기 때문에 리더의 이탈은 크리티컬하다.
5번째 사례는 리더가 LeaveGroup
한 경우에 대한 사례이다.
팔로워랑 비교해서 어떤 부분에서 차이가 있는지 확인해보자.
리더인 Worker-1번이 LeaveGroup
하면 소유하고 있던 AC-0, AT-1이 Revoke되면서 첫 번째 리밸런싱 트리거된다.
리밸런싱이 트리거되면 나머지 팔로워 Worker들은 리밸런싱에 참가한다.
팔로워 Worker들은 자신이 소유한 Task/Connector 정보를 담아 JoinGroupRequest
를 전송한다.
GroupCoordinator는 새로운 리더로 Worker-3번을 선출한다.
기존 팔로워가 그룹을 떠날 때와 달리, 새로운 리더가 된 Worker-3은 Task/Connector 할당 정보를 계산할 때, 별도의 delay
를 주지 않고 이전 리더 (Worker-1)에서 Revoke된 Task/Connector도 함께 포함해 계산한다.
마지막으로 SyncGroupResponse
를 받고 첫 번째 리밸런싱이 완료되고, Worker-1이 Revoke한 Task/Connector도 별도의 Delay
없이 바로 다른 Worker에 할당되는 것을 확인할 수 있다.
리더가 그룹을 나가는 경우는 팔로워가 그룹을 나갈 때와 다르게 간단한 것을 확인할 수 있다.
마지막 사례로 리더가 잠깐동안 Group을 떠나는 케이스에 대해서 알아보자.
사례 분석 6. Leader Worker가 잠깐 그룹을 떠났다가 다시 돌아오는 경우
마지막 사례는 리더가 잠깐 그룹을 떠난 경우이다.
팔로워가 먼저 그룹을 떠났다 돌아오고, 이후에 리더가 그룹을 떠났다가 돌아오는 과정으로 사례를 분석해보자.
우선 팔로워 Worker-2가 LeaveGroup
을 통해 그룹을 나가면 리밸런싱(Generation 1)이 트리거된다.
Worker-2가 소유하던 Task/Connector는 Revoke된다.
리더는 리밸런싱에 참여한 Worker-1, 3에 할당할 Task/Connector 정보를 계산한다.
이 때, 사례 3,4에서 확인한 것 처럼 리더는 delay d
를 활성화시킨다.
SyncGroup을 받은 Worker-1, 3은 현재 소유한 정보에서 변경된 점이 없기 때문에 별도의 Assign 단계를 거치지 않으며, 첫 번째 리밸런싱(Generation 1)이 완료된다.
활성화 된 delay d
가 모두 지나기 전에 Worker-2가 다시 그룹에 가입요청을 보내면서 두 번째 리밸런싱이 시작된다. (Generation 2)
Worker-1, 3은 자신이 소유한 Task/Connector 정보를 메타데이터에 담아 JoinGroupRequest
를 전송한다.
리더인 Worker-1은 그룹에 다시 가입하는 Worker-2를 포함해 Task/Connector를 어떻게 할당할지 계산한다.
이 때, delay
는 d
에서 일정시간 경과했기 때문에 d#
으로 변경된다.
SyncGroup
이 완료되면 Worker-2는 성공적으로 Worker 그룹에 가입하고 두 번째 리밸런싱(Generation 2)이 완료된다.
별도의 Revoke할 Task/Connector가 없기 때문에 Revoked된 Task/Connector (Worker-2에서 Revoke 된 리소스)를 Assign하기 위한 리밸런싱을 바로 트리거해야한다.
하지만, delay d#이 지나기 전에 리더인 Worker-1가 LeaveGroup
요청을 보내면서 세 번째 리밸런싱(Generation 3)이 트리거된다.
따라서, Worker-2와 Worker-3은 각각 assignment:[]
, assignment:[BT-1]
을 담아 리밸런싱에 참여한다.
리더가 그룹에서 떠났기 때문에 GroupCoordinator는 새로운 리더를 선출한다.
새로운 리더로 선출된 Worker-3은 현재 활동하는 Worker-2, 3에 할당할 Task/Connector 정보를 계산한다.
이 때, 최초 delay
부터 지금까지 경과한 시간을 뺀 delay d##
으로 delay
를 설정한다.
SyncGroup
이 완료된 이후 세 번째 리밸런싱(Generation 3)이 완료된다.
아직 delay d##
만큼 시간이 남아있기 때문에 사례 5번에서 리더가 그룹을 나갈 때 Revoked Task/Connector를 Assign하기 위해 리밸런싱을 트리거 한것과 달리 delay d##
동안 리밸런싱을 유예시킨다.
delay d##
이 경과하기 전에 Worker-1은 다시 그룹에 가입하기 위해 요청을 보낸다.
Worker-2, 3은 자신이 소유한 Task/Connector 정보를 기반으로 네 번째 리밸런싱(Generation 4)에 참여한다.
리더인 Worker-3은 새로 가입하는 Worker-1을 포함해 각 Worker에 어떻게 Task/Connector를 할당시킬지 계산한다.
이 때, delay d##
에서 일정시간 경과했기 때문에 delay d###
으로 설정한다.
모든 Worker가 SyncGroup
을 완료하면 네 번째 리밸런싱 (Generation 4)이 완료되고 Worker-1도 성공적으로 그룹에 가입한다.
리더가 계산한 정보에서 별도의 Revoked되는 Task/Connector가 없기 때문에 이미 Revoked된 Task/Connector를 할당하기 위한 리밸런싱을 트리거한다.
delay d###
이 지난 후에 모든 Worker는 자신이 소유한 정보를 기반으로 다시 JoinGroupRequest
를 전송하며 다섯 번째 리밸런싱 (Generation 5)이 시작된다.
리더는 아직 할당되지 않은 Task/Connector들을 균형있게 각각 Worker에 할당하기 위해 계산한다.
마지막으로 SyncGroup
이 완료되면 모든 Task/Connector가 모든 Worker에 균형잡히게 할당되고 정상적으로 데이터 처리를 재개한다.
결론
Kafka Connect Worker에서 발생할 수 있는 여러가지 사례와 함께 Incremental Cooperative Rebalancing
에 대해서 알아보았다.
사례 분석에서 알 수 있듯, Incremental Cooperative Rebalncing
이 적용된 Kafka Connect Worker에서는 단일 리밸런싱이 아닌 연속적인 리밸런싱을 통해 최종적으로 균형적인 Task/Connector 할당 상태를 만드는 것을 확인할 수 있다.
또한, 모든 리밸런싱 과정에서 최소 하나의 Task/Connector는 항상 할당된 상태를 유지하는 것을 확인할 수 있다.
이러한 방식으로 Incremental Cooperative Rebalancing
에서는 Stop-The-World
현상이 발생하지 않으며, 데이터 처리의 연속성을 보장한다.
Reference
KIP-415: Incremental Cooperative Rebalancing in Kafka Connect
Incremental Cooperative Rebalancing: Support and Policies
Confluent Blog: Incremental Cooperative Rebalancing in kafka
'Kafka > Kafka' 카테고리의 다른 글
Kafka: Static Membership (0) | 2024.11.03 |
---|---|
Kafka Issue: InvalidRecordException (related Timestamp) (0) | 2024.06.01 |
Kafka: Kafka Broker 내부 구조 (0) | 2024.04.28 |