⛳ 로드맵
이전 Configuration 설정에 이어서 Producer/Consumer를 구현 후 기존 프로젝트에서 사용한 로직과 Kafka 도입 후 로직의 간단한 차이를 확인해 볼 예정이다.
기존 회원가입 프로세스 로직
현재 프로젝트에서 회원가입 프로세스는 아래와 같다.
- 이미 가입된 회원인지 확인한다.
- 회원 아이디, 닉네임이 들어간 회원정보, 추천인 아이디, 관심 OTT 서비스 아이디 리스트를 파라미터로 받는다.
- 회원 정보를 저장한다. ( 저장한 회원 정보를 리턴한다.)
- 관심 OTT 플랫폼 아이디 리스트 저장(Optional)
- OTT 플랫폼 아이디 리스트로 OTT 플랫폼을 조회한다.
- 3번에서 저장한 회원 정보로 회원이 가진 관심 OTT 플랫폼 리스트를 삭제한다 .
- 새로운 OTT 플랫폼 아이디 리스트를 저장한다.
- 추천인 아이디는 옵션이므로 있는 경우 (Optional)
- 추천인 회원을 조회한다.
- 추천인 회원 엔티티의 포인트를 업데이트한다.
- 포인트 상세이력을 저장한다.
현재 프로세스를 구현한 로직을 확인하자.
// MemberServiceImpl -> 회원 API 서버 내 회원 서비스
/**
* 회원의 상세내용과 관심 Ott 아이디를 이용해 회원을 저장하고 관심 Ott 플랫폼을 등록한다.
*
* @param member - 회원의 상세내용 (memberId, nickname, bankId, bankAccount, grade, point, exp, billingKey, partyInviteYn, memberOtt)
* @param ottId - Integer 타입의 Ott 플랫폼 아이디 리스트
* @return MemberDTO - 새로 저장된 회원의 상세내용 (memberId, nickname, bankId, bankAccount, grade, point, exp, billingKey, partyInviteYn, memberOtt)
*/
@Override
@Transactional
public MemberDTO saveMember(Member member, List<Integer> ottId, String recommendMemberId) {
// 1. 이미 가입된 회원인지 판단한다. 가입된 경우 예외처리
Optional.ofNullable(memberRepository.findByMemberId(member.getMemberId())).ifPresent(
exception -> {throw new DuplicateMemberIdException("이미 존재하는 회원입니다.", NbbangException.DUPLICATE_MEMBER_ID);}
);
// 2. 회원을 저장한다. Member 엔티티엔 기본적으로 회원 아이디와 닉네임이 들어가있다.
Member savedMember = Optional.ofNullable(memberRepository.save(member))
.orElseThrow(() -> new NoCreateMemberException("회원정보 저장에 실패했습니다.", NbbangException.NO_CREATE_MEMBER));
// 3. OTT 아이디 리스트를 이용해 OTT 플랫폼을 모두 조회한다.
List<OttView> findOttViews = Optional.ofNullable(ottViewRepository.findAllByOttIdIn(ottId))
.orElseThrow(() -> new NoSuchOttException("존재하지 않는 OTT 플랫폼입니다.", NbbangException.NOT_FOUND_OTT));
// 4. MemberOtt(관심 OTT) 엔티티로 변경
List<MemberOtt> memberOttList = MemberOttDTO.toEntityList(savedMember, findOttViews);
// 5. 양방향 연관 관계 매핑 (회원 - 회원OTT 양방향 관계 매핑)
for (MemberOtt memberOtt : memberOttList) {
memberOtt.addMember(savedMember);
}
// 6. 관심 OTT 플랫폼을 저장한다.
List<MemberOtt> savedMemberOtt = Optional.of(memberOttRepository.saveAll(memberOttList))
.orElseThrow(() -> new NoCreatedMemberOttException("관심 OTT 등록을 실패했습니다.", NbbangException.NO_CREATE_MEMBER_OTT));
// 7. 추천인 Point를 적립해준다.(추천인 회원 찾기, 포인트 수정 ,포인트 이력 저장)
if(!recommendMemberId.isEmpty()) {
// 7-1. 추천인 아이디로 추천인을 조회한다.
Member recommendMember = Optional.ofNullable(memberRepository.findByMemberId(recommendMemberId))
.orElseThrow(() -> new NoSuchMemberException("추천인 아이디가 존재하지 않습니다.", NbbangException.NOT_FOUND_MEMBER));
// 7-1. 추천인 포인트를 적립해준다.
recommendMember.updatePoint(recommendMemberId, 500L, PointType.INCREASE);
// 7-2. 적립된 상세이력을 저장한다.
Optional.ofNullable(pointRepository.save(PointDTO.toEntity(recommendMember,
PointDTO.builder()
.usePoint(500L).pointType(PointType.INCREASE)
.pointDetail(savedMember.getNickname() + "님의 추천인 입력 적립").build())))
.orElseThrow(() -> new NoCreatedPointDetailsException("포인트 상세이력을 저장하는데 실패했습니다.", NbbangException.NO_CREATE_POINT_DETAILS));
}
// 8. 저장된 회원 객체를 반환한다.
return MemberDTO.create(savedMember);
}
현재 프로세스에서 인증 서버 내 회원 서비스에서 처리할 역할의 경계는 1번 ~ 3번까지이고, 4번과 5번은 회원 API 서버의 포인트 서비스와 관심 OTT 서비스에서 처리하기로 결정했다.
회원 API 서버의 관심 OTT 서비스에서 관심 OTT 플랫폼을 등록하기 위해선 먼저 회원 정보가 저장되어야 한다.
따라서, 인증 서버(Producer)에서 회원 저장이 완료되면, 회원 API 서버(Consumer)로 회원 아이디, 관심 OTT 아이디 리스트, 추천인 아이디를 담은 메세지를 발행될 것이다.
Produer 구현
먼저 인증 서버 내 회원 서비스에 구현한 MemberServiceImpl 로직을 확인해보면 기존 회원 API 서버 내 회원 서비스에 구현되어 있는 중복 회원 가입 여부 및 회원 저장 프로세스가 그대로 구현된 것을 확인할 수 있다.
그리고, 회원 저장이 제대로 완료된 경우 메세지를 발행하는 로직을 구현했다. 관심 OTT 플랫폼 등록과 추천인 포인트 적립 로직은 옵션이기 때문에 둘 중 하나라도 입력이 들어온 경우만 메세지를 발행하도록 구현했다.
// MemberServiceImpl -> 인증 서버 내 회원 서비스
@Override
public MemberDTO saveMember(Member member, List<Integer> ottId, String recommendMemberId) {
// 1. 회원이 존재하는지 판단
Optional.ofNullable(memberRepository.findByMemberId(member.getMemberId())).ifPresent(
exception -> { throw new DuplicateMemberIdException("이미 존재하는 회원입니다.", NbbangException.DUPLICATE_MEMBER_ID); }
);
// 2. 회원 정보 저장
Member savedMember = Optional.of(memberRepository.save(member))
.orElseThrow(() -> new NoCreateMemberException("회원정보 저장에 실패했습니다.", NbbangException.NO_CREATE_MEMBER));
// 3. 회원 정보 저장 시 카프카 메세지 전달
if((!ottId.isEmpty() && !savedMember.getMemberId().isEmpty()) || !recommendMemberId.isEmpty()) {
try {
memberProducer.sendRecommendIdAndOttId(MemberProducer.KafkaSendRequest.create(savedMember.getMemberId(), recommendMemberId, ottId));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
// 4. 저장 완료된 경우 저장된 회원 리턴
return MemberDTO.create(savedMember);
}
그러면 실제 회원 API 서버로 메세지를 발행하는 Producer 코드를 확인해보자.
위의 3번 로직에서 Request할 데이터를 Producer내 DTO 클래스의 정적 팩토리 메소드로 만들고 해당 데이터를 직렬화한 뒤 회원 API 서버로 전송하는 로직이다.
// MemberProducer -> 인증 서버 내 회원 서비스
@Component
@Slf4j
@RequiredArgsConstructor
public class MemberProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void sendRecommendIdAndOttId(KafkaSendRequest kafkaSendRequest) throws JsonProcessingException {
log.info("[MemberProducer] Auth Service -> Member Service");
String sendMessage = objectMapper.writeValueAsString(kafkaSendRequest); // Request String 직렬화
log.info("[MemberProducer] sendMessage : " + sendMessage);
try {
kafkaTemplate.send("new-member-register", sendMessage); // 토픽 : new-member-register
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 회원 서비스로 보낼 Request DTO
@Getter
@NoArgsConstructor
static class KafkaSendRequest {
private String memberId; // 가입된 회원 아이디
private String recommendId; // 추천인 회원 아이디
private List<Integer> ottId; // 관심 OTT 아이디 리스트
@Builder
public KafkaSendRequest(String memberId, String recommendId, List<Integer> ottId) {
this.memberId = memberId;
this.recommendId = recommendId;
this.ottId = ottId;
}
public static KafkaSendRequest create(String memberId, String recommendId, List<Integer> ottId) {
return KafkaSendRequest.builder()
.memberId(memberId)
.recommendId(recommendId)
.ottId(ottId)
.build();
}
}
}
Consumer 구현
기존 로직에서는 회원 가입, 관심 OTT 플랫폼 등록, 추천인 포인트 적립을 모두 MemberServiceImpl(회원 서비스) 내에서 처리했지만, 수정된 로직에서는 각각 PointServiceImpl(포인트 관련 서비스)과 MemberOttServiceImpl(관심 OTT 서비스)에서 각각 처리하는 것이 가능해졌다.
추천인 포인트 적립 및 관심 OTT 플랫폼 등록 로직을 처리하는 PointServiceImpl과 MemberOttServiceImpl을 먼저 확인해보면, 기존 회원 서비스에 있던 로직이 따로 빠진 것을 확인할 수 있다.
// PointServiceImpl -> 회원 API 서버 내 포인트 서비스
@Override
public PointDTO updatePoint(String recommendId) {
// 1, 회원 아이디를 이용해 추천인 회원 찾기
Member recommendMember = Optional.ofNullable(memberRepository.findByMemberId(recommendId))
.orElseThrow(() -> new NoSuchMemberException("회원이 존재하지 않습니다.", NbbangException.NOT_FOUND_MEMBER));
// 2. 추천인 회원 500 포인트 적립
recommendMember.updatePoint(recommendId, 500L, PointType.INCREASE);
// 3. 포인트 상세 이력 저장
Point savedPoint = Optional.ofNullable(pointRepository.save(PointDTO.toEntity(recommendMember)))
.orElseThrow(() -> new NoCreatedPointDetailsException("추천인 적립에 실패했습니다.", NbbangException.NO_CREATE_POINT_DETAILS));
return PointDTO.create(savedPoint);
}
// MemberOttServiceImpl -> 회원 API 서버 내 관심 OTT 서비스
@Override
@Transactional
public List<MemberOttDTO> saveMemberOtt(String memberId, List<Integer> ottId) {
// 1. 회원 찾기
Member findMember = Optional.ofNullable(memberRepository.findByMemberId(memberId))
.orElseThrow(() -> new NoSuchMemberException("회원이 존재하지 않습니다.", NbbangException.NOT_FOUND_MEMBER));
// 2. OTT 찾기
List<OttView> findOttViews = Optional.ofNullable(ottViewRepository.findAllByOttIdIn(ottId))
.orElseThrow(() -> new NoSuchOttException("존재하지 않는 OTT 플랫폼입니다.", NbbangException.NOT_FOUND_OTT));
// 3. 관심 OTT가 등록되어 있는 경우 관심 OTT 먼저 날리기
Optional.ofNullable(memberOttRepository.findAllByMember(findMember)).ifPresent(
deleteLogic -> memberOttRepository.deleteByMember(findMember)
);
// 4. 관심 OTT 등록
List<MemberOtt> savedMemberOtt = Optional.of(memberOttRepository.saveAll(MemberOttDTO.toEntityList(findMember, findOttViews)))
.orElseThrow(() -> new NoCreatedMemberOttException("관심 OTT 서비스가 등록되지 않았습니다.", NbbangException.NO_CREATE_MEMBER_OTT));
return MemberOttDTO.createList(savedMemberOtt);
}
회원 서비스 로직에 있던 로직들을 포인트 서비스와 관심 OTT 서비스에서 처리하도록 구분했다.
마지막으로 Consumer 코드를 구현해 메세지를 수신 했을 때 추천인 아이디 혹은 관심 OTT 아이디 리스트가 있다면 각각 해당 서비스 로직을 호출해 처리하면 된다.
// MemberConsumer -> 회원 API 서버 내 회원 서비스
@Component
@RequiredArgsConstructor
@Slf4j
public class MemberConsumer {
private final MemberOttService memberOttService; // 관심 OTT 서비스
private final PointService pointService; // 포인트 서비스
private final ObjectMapper objectMapper;
@Transactional
@KafkaListener(topics = "new-member-register") // Producer가 설정한 토픽을 구독
public void saveRecommenderPointAndMemberOtt(String receivedMessage) throws JsonProcessingException {
log.info("[MemberConsumer] ReceivedMessage : " + receivedMessage);
// 받은 메세지를 역직렬화하여 JSON 형태로 가져오기
KafkaReceiveRequest receivedData = objectMapper.readValue(receivedMessage, KafkaReceiveRequest.class);
// 추천인 아이디가 있다면 추천인 적립 로직 처리
if(!receivedData.getRecommendId().isEmpty()) {
pointService.updatePoint(receivedData.getRecommendId());
}
// 관심 OTT 아이디 리스트가 있다면 관심 OTT 플랫폼 등록 로직 처리
if(!receivedData.getMemberId().isEmpty() && !receivedData.getOttId().isEmpty()) {
memberOttService.saveMemberOtt(receivedData.getMemberId(), receivedData.getOttId());
}
}
@Getter
@NoArgsConstructor
static class KafkaReceiveRequest {
private String memberId; // 회원 아이디
private String recommendId; // 추천인 아이디
private List<Integer> ottId; // 관심 OTT 아이디 리스트
@Builder
public KafkaReceiveRequest(String memberId, String recommendId, List<Integer> ottId) {
this.memberId = memberId;
this.recommendId = recommendId;
this.ottId = ottId;
}
}
}
이제 인증 서비 내 회원 서비스에서 회원 가입로직을 진행 시 메세지가 잘 전달되고 회원 API 서버에서는 잘 처리하는지를 확인해보면 아래와 같이 잘 처리됨을 확인 할 수 있다.
정리
간단하게 Kafka를 이용해 서버 간 데이터를 처리해봤다.
아직 Kafka에 대한 지식이 부족하기 때문에 조금 더 공부하고 다양한 코드를 접해보는 것이 중요할 것 같다.
사실 Kafka가 분산 서버에서 대용량 데이터를 비동기 방식으로 처리하는 것을 장점으로 갖는 메세징 시스템이기 때문에 이런 방식으로 적용하는 것이 알맞지 않을 수 있다.
ActiveMQ나 RabbitMQ 같은 큐잉 시스템도 알아보고 서비스에 알맞는 시스템을 선택하는 것도 중요할 것 같다.
'Backend > Kafka' 카테고리의 다른 글
Kafka를 이용해 Producer/Consumer 맛보기! - Configuration (0) | 2022.05.06 |
---|