Home 채팅 시스템 구현 (WebSocket)
Post
Cancel

채팅 시스템 구현 (WebSocket)

강아지 미용 중개에 있어 미용사가 견적서를 보내면 사용자는 채팅을 통해 흥정을 할 수 있도록 만들었다.

채팅을 통해 가격에 대한 설명을 받고, 가격이 변동되면 채팅창 내에서 견적서를 즉시 수정해 보내주면 채팅방에서 변경된 견적서 내용을 바탕으로 결제가 가능한 구조이다.

이 기능을 위한 채팅 시스템을 구현해보자.

채팅 시스템 고려 사항

흔히 채팅 시스템을 구현할 때 WebSocket을 사용한다.

왜 HTTP를 선택하지 않고 WebSocket을 이용해 구현하는지부터 알아보자.

HTTP vs WebSocket

  • HTTP 방식
    • 요청/응답 모델에 기반한다.
    • 클라이언트가 서버에 요청을 보내면 서버가 해당 요청에 대해 응답을 반환하는 구조이다. (단방향 통신)
    • 소켓 통신을 기반으로 하고 TCP/IP 헤더들이 붙어 메시지가 송수신된다.
    • JSON/Image 등등 다양한 형식의 파일들을 전송할 수 있다.
    • 기본적으로 요청마다 새 연결을 해주어야 한다.
  • WebSocket
    • HTTP와 마찬가지로 통신 프로토콜이다. HTML5 표준의 일부이다.
    • HTTP 핸드셰이크로 연결을 시작하고, 지속적인 연결을 유지하며 실시간 데이터 교환이 가능하도록 한다.
      • 클라이언트가 서버에 WebSocket 핸드셰이크 요청을 보낸다.
      • 서버가 요청을 승인하면 WebSocket 연결이 열린다.
      • 연결이 열리면 클라이언트와 서버가 양방향으로 데이터를 주고받을 수 있다.
    • 매번 메시지를 주고받을 때 새롭게 연결을 맺을 필요가 없는 것이다.
    • 이름은 소켓이지만 HTTP와 동일하게 애플리케이션 계층에서 동작한다.
    • 평문 메시지 전송 방식이어서 SSL/TLS 보안 계층으로 암호화되어야 데이터 탈취를 방지할 수 있다.

개념은 간단하게 이 정도로 확인하고 왜 HTTP 통신보다 WebSocket 방식을 채택하는지에 대해 알아보자.

HTTP에서의 실시간 통신

  • Polling
  • Long Polling
  • Streaming

위 세 가지 방식으로 실시간 통신을 구현할 수 있다.

Polling

브라우저가 일정 주기마다 서버에 HTTP 요청을 보내 데이터를 확인하는 방식이다.

서버는 항상 새로운 요청에 응답하고, 새로운 데이터가 없으면 빈 응답을 반환한다.

  • 클라이언트가 정해진 시간 간격 (ex - 1초)마다 서버에 요청을 보낸다.
  • 서버는 요청을 처리하고 데이터를 응답한다.
  • 새로운 요청을 반복적으로 수행한다.

이러한 방식은 새로운 데이터가 없어도 요청/응답을 지속적으로 발생시켜 불필요하게 리소스가 낭비된다는 단점이 크다.

그리고 데이터의 변경이 발생해도 요청 주기 안에 있지 않다면 실시간성이 보장되지 않는다는 단점이 있다. 물론 요청 주기를 매우 짧게 한다면 실시간성이 보장되겠지만, 그에 따른 부하는 더 커진다.

날씨 정보, 주식 가격 등의 주기적인 데이터 갱신에 사용될 수 있다. 실시간성이 크게 중요하지 않은 경우이다.

Long Polling

Polling 방식의 서버 부하라는 단점을 줄이면서 실시간성을 높이기 위한 방식이다.

HTTP 요청이 들어올 때 서버는 Polling 방식과 달리 요청에 대한 응답을 보낸 후 즉시 연결을 끊지 않고 일정 시간 대기한다.

클라이언트가 서버에 요청을 보냈을 때 서버가 새로운 데이터가 있을 때까지 응답을 지연시키고 새로운 데이터가 도착했을 때 즉시 응답한다. 클라이언트는 응답을 받고 다시 요청을 보내는 방식이다.

  • 클라이언트가 서버에 요청을 보낸다.
  • 서버는 새로운 데이터가 생길 때까지 응답을 보류한다.
  • 데이터가 생기면 서버는 응답을 보내고 클라이언트는 응답을 받은 즉시 다시 요청하는 방식이다.

이 방식의 단점은 많은 클라이언트가 동시에 연결되었을 때 서버의 연결 수가 증가해 부하가 발생할 수 있다는 점이다.

한 명이 채팅을 입력하면 데이터가 변경되는 것이고 서버는 변경된 데이터를 연결된 모든 클라이언트에 응답할 것이다. 그리고 그 클라이언트들은 응답을 받고 다시 요청을 동시에 서버에 보내게 된다. 순간적으로 트래픽이 몰리는 경우가 발생할 수 있는 것이다.

간단한 채팅이나 알림 시스템에 적용해볼 수 있다.

Streaming

서버가 클라이언트의 요청에 응답을 완료하지 않고 연결을 지속적으로 유지하며 데이터를 실시간으로 푸시하는 방식이다. 서버는 무한정 혹은 일정 시간동안 요청을 대기시키고, chuncked 메시지를 이용하여 연결을 계속 유지한다.

  • 클라이언트가 서버에 요청을 보낸다.
  • 서버는 응답을 종료하지 않고 데이터를 지속적으로 클라이언트에게 전송한다.
  • 연결이 끊어질 때까지 데이터를 지속적으로 스트리밍한다.

Polling 방식들에 비교하면 HTTP Stateless 특성에 의한 재연결에 대한 부분의 부하는 없다. 실시간으로 데이터를 전송하는 부분에 있어서도 장점이 있다.

그러나 연결을 유지하는 비용이 들고, 브라우저에 따라 지원하지 않는 경우도 있다. 그리고 클라이언트 측에서 서버에 데이터를 보내기 힘들다는 단점 또한 존재한다.

WebSocket을 활용한 실시간 통신

WebSocket은 위에 설명된 HTTP의 Polling, Long Polling, Streaming 방식의 단점을 보완한다.

기본적으로 클라이언트와 서버 간 양방향 통신을 지원하고 연결을 유지하기 때문에 실시간성을 보장하는데 매우 유리하다.

  • 클라이언트가 HTTP 요청을 전송해 WebSocket으로 프로토콜을 전환할 것을 요청한다.
    • Upgrade 헤더를 포함한 요청을 서버로 보낸다. (Upgrade: websocket)
  • 서버가 연결을 승인한다.
    • 101 Switching Protocols 응답을 반환하여 승인한다.
  • 이 후의 HTTP 연결은 WebSocket으로 업그레이드되고, 클라이언트와 서버는 지속적으로 양방향 통신을 할 수 있게 된다.
  • 이후 클라이언트 또는 서버가 필요에 따라 연결을 닫을 수 있다.

WebSocket의 동작 방식은 위와 같고, 어떤 장점을 가지는지 보자.

  • HTTP는 어떤 응답을 받기 위해 항상 요청이 우선되어야 한다. 하지만 웹소켓은 연결이 유지되고 있는 상태이기 때문에 그럴 필요가 없다.
  • HTTP에 비해 웹소켓이 보내야하는 메시지, 데이터의 양이 훨씬 적다.
    • HTTP 요청에는 Request URL, 상태 코드, 헤더/바디 등등 데이터의 양이 많다. 실시간성을 요구하는 서비스에는 부담이 갈 수 있다.
  • WebSocket은 텍스트, 바이너리 데이터 등 다양한 형식의 데이터를 전송할 수 있다.
    • 이를 통해 실시간 채팅, 스트리밍, IoT 애플리케이션 등 다양한 요구사항에 대응할 수 있다.
  • WebSocket은 높은 동시 연결 수를 효율적으로 처리할 수 있는 구조로 설계되어 있다.
    • 실시간 애플리케이션에서 중요한 스케일링 문제를 해결할 수 있다.

물론 단점도 존재한다. HTTP보다 구현이 복잡하고 연결 상태를 관리하기 위한 추가적인 로직이 필요하다. 그리고 일부 네트워크 환경에서는 WebSocket이 차단될 가능성도 있다.

즉, 정리해보면 연결을 한 번만 설정해 효율적으로 네트워크를 사용하며 이에 따라 서버의 부하도 적다. 양방향 통신을 지원하기 때문에 실시간성도 뛰어나 채팅 시스템에 어울리는 것이다. 또한 높은 연결 수를 효율적으로 처리해 확장성이 높다는 장점 또한 가진다.

실시간 채팅, 실시간 알림, 멀티 플레이어 게임, 라이브 스트리밍 서비스, Google Docs와 같은 실시간 협업 도구 등의 구현에 활용된다.

메시지 읽음 상태 관리

메시지는 보통 읽음 상태를 관리하게 된다. 채팅방에 아직 읽지 않은 채팅 수, 채팅 읽음 처리 등등에 사용될 수 있다. 하지만 나의 기존 설계에는 RDB 테이블에는 읽음 상태에 대한 관리를 하는 필드가 없었다.

그럼 어떻게 관리할 수 있을까?

  • Redis에 마지막 읽은 메시지 ID를 채팅방 ID, UserID와 함께 저장해 그 메시지 ID보다 작은 ID값은 모두 읽음 상태로 처리한다.
  • RDB의 Message 테이블에 읽음 상태에 대해 관리하는 필드를 추가한다.

RDB에 추가하는 방식을 먼저 고민해보았는데, 메시지별로 쉽게 읽음 상태를 관리할 수 있다는 점에서 장점을 가지지만 읽음 상태를 update 해야할 때 한 번에 많은 메시지 데이터들을 업데이트해야 한다는 단점이 있다. 성능 상 문제가 있다.

Redis를 사용한 방식은 일반적으로는 문제가 없지만 데이터가 휘발될 위험이 있고, 메시지별로 세세한 읽음 상태 관리가 어렵다는 단점이 있다.

따라서 우선 RDB에 추가하는 방식은 성능에 문제가 있기 때문에 채택할 수 없었고, Redis + RDB 방식을 채택하려고 한다.

평소에는 Redis에 마지막 읽은 메시지 ID값을 저장하는 방식으로 읽지 않은 메시지 수나 읽음 상태를 관리하고 데이터의 영속성을 위해 RDB에 상태를 저장하는 것은 배치 + 스케줄러를 통해 한 번에 상태를 업데이트하는 것을 목표로 한다.

실시간성이 중요한 채팅 시스템의 성능을 높이고, 데이터의 영속성을 모두 챙길 수 있는 방법이다. 그리고 추후 채팅별 읽음 처리 등의 기능 확장이 필요할 때 해당 필드가 쓰일 수 있다는 점도 장점이 될 수 있다.

채팅 시스템 구현

1
implementation 'org.springframework.boot:spring-boot-starter-websocket'

우선 WebSocket 사용을 위한 의존성을 추가해준다.

WebSocketConfig

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {

    private final WebSocketHandler chatSocketHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatSocketHandler, "/ws/chat").setAllowedOrigins("*");
    }
}
  • addHandler
    • 특정 WebSocket 요청을 처리할 핸들러를 등록한다.
    • chatSocketHandler를 /ws/chat 경로로 들어오는 요청에 연결한다.
      • /ws/chat -> ws://localhost:8080/ws/chat으로 WebSocket 연결을 요청할 수 있다.
  • setAllowedOrigins
    • CORS 설정으로 모든 도메인에서 해당 WebSocket 엔드포인트에 접근할 수 있도록 허용한다.

ChatSocketHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatSocketHandler extends TextWebSocketHandler {

    private final ObjectMapper objectMapper; // JSON 파싱
    private final ChatMessageService chatMessageService;
    private final Map<Long, Set<WebSocketSession>> chatRoomSessions = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Long roomId = Long.parseLong(Objects.requireNonNull(session.getUri().getQuery()));
        String sessionId = session.getId();

        chatRoomSessions.putIfAbsent(roomId, ConcurrentHashMap.newKeySet());
        chatRoomSessions.get(roomId).add(session);

        log.info("WebSocket 연결 roomId: " + roomId + " sessionId: " + sessionId);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        ChatMessageDto chatMessageDto = objectMapper.readValue(payload, ChatMessageDto.class);

        chatMessageService.saveMessageRedis(chatMessageDto);

        broadcastToRoom(chatMessageDto.getRoomId(), chatMessageDto);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.error("WebSocket 에러: " + exception.getMessage());
        session.close();
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String sessionId = session.getId();
        chatRoomSessions.forEach((roomId, sessions) -> sessions.remove(session));

        log.info("WebSocket 연결 종료 sessionId: " + sessionId);
    }

    @Override
    public boolean supportsPartialMessages() {
        return false; // 부분적으로 분할된 WebSocket 메시지를 처리하지 않는다.
    }

    private void broadcastToRoom(Long roomId, ChatMessageDto message) throws Exception {
        Set<WebSocketSession> roomSessionsSet = chatRoomSessions.getOrDefault(roomId, Collections.emptySet());

        for (WebSocketSession session : roomSessionsSet) {
            if (session.isOpen()) {
                session.sendMessage(new TextMessage(objectMapper.writeValueAsString(message)));
            }
        }
    }
}
  • chatRoomSessions = ConcurrentHashMap
    • 다중 스레드 환경에서 동기화를 보장해준다.
    • WebSocket은 다수의 클라이언트가 동시에 연결될 수 있는데, 그렇다면 여러 스레드에서 안전하게 세션을 추가/삭제할 수 있어야 한다. 따라서 ConcurrentHashMap을 사용한다.
  • afterConnectionEstablished()
    • 클라이언트가 WebSocket 연결을 시작하면 호출된다.
      • ws://localhost:8080/ws/chat?roomId=1 과 같은 방식으로 요청
      • 위 URI에서 roodId를 추출.
    • 연결된 세션을 채팅방 ID (roomId)를 기준으로 매핑한다.
  • handleTextMessage()
    • 클라이언트로부터 텍스트 메시지를 수신하면 호출되는 메서드이다.
    • 메시지를 처리하고, 채팅방의 다른 사용자에게 브로드캐스트한다.
    • JSON 데이터를 ChatMessageDto로 변환 후 Redis에 저장하고 브로드캐스트 한다.
  • broacastToRoom()
    • 특정 채팅방에 연결된 클라이언트 세션들에 메시지를 전송한다.
    • 세션이 열려있는지 체크 후 메시지를 JSON으로 변환하여 전송해준다.
  • handleTransportError()
    • WebSocket 통신 중 오류가 발생했을 때 호출된다.
    • 오류 발생 시 로그를 남기고 세션을 닫는다.
  • afterConnectionClosed()
    • 클라이언트가 WebSocket 연결을 종료하면 호출된다.
    • 종료된 세션을 세션 목록에서 제거한다.
    • 사용자가 채팅방을 나간다는 요청을 할 때 WebSocket 연결이 끊어지도록 구현해야 한다.
      • React 서버에서 채팅방 나가기 API를 호출 후 socket 연결을 끊어주면 핸들러에서 이 메서드 호출되도록 구현.
    • 브라우저가 종료되면 WebSocket 연결이 자동으로 끊어진다.

우선 이러한 구조로 WebSocket 연결에 대한 처리를 할 수 있다.

WebSocket 핸들러를 통해 WebSocket 연결, 그리고 메시지 전달에 관련한 이벤트들을 처리하고, 나머지 부분들은 API를 구현하여 완성하면 된다.

사실 이 핸들러에서 더 구현해야 하는 부분이 있다.

Access 토큰이 쿠키에 담겨있어 handleTextMessage() 메서드에서 그 토큰을 활용해 senderId나 senderRole 정보를 메시지에 담는 부분이 필요하다.

하지만 그 부분은 채팅 시스템 구현마다 다를 것이기 때문에 생략한다.

API 구현

채팅방 생성, 채팅방 나가기, 채팅 조회 등의 API를 구현해보자.

우선 구현을 하다보니 기존에 Redis에 저장했던 메시지 데이터들을 RDB에 저장하기로 했던 부분들을 MongoDB에 저장하는 방안으로 전환하기로 했다.

고민을 하게 된 이유는 단순히 비정형의 데이터를 저장하는 목적이 전부라면 굳이 MongoDB를 선택해야 하는가라는 질문을 받았다.

RDB의 JSON 타입 저장 방식을 채택하면 MongoDB 서버를 따로 띄울 필요가 없다는 장점이 있다. 비용 측면에서 유리하다. 그리고 MongoDB와 비교했을 때 저장이나 조회 성능에도 당장은 큰 차이가 없는 것으로 알게 되었다.

하지만 MongoDB를 선택하게 된 이유는 아래와 같다.

  • 기존 RDB의 JSON 타입 저장 기능 활용
    • 트랜잭션과 정교한 쿼리를 그대로 사용 가능하다.
    • 구조화된 데이터로 관리가 가능하고 별도의 기술 도입이 필요 없다.
    • 단, JSON 필드에 대한 쿼리의 성능이 다른 테이블 필드보다 낮을 수 있다. MySQL의 경우 인덱싱에도 제약이 있다고 한다.
    • RDB는 수평적 확장보다는 수직 확장에 유리하다. 읽기/쓰기 요구량이 증가했을 때 한계가 있다.
  • MongoDB 저장
    • JSON/BSON 데이터를 자연스럽게 저장, 검색, 업데이트하며, 관계형 모델보다 비정형 데이터 처리에 최적화되어 있다.
    • 샤딩을 통해 대규모 데이터를 처리 가능하고 클러스터를 쉽게 확장할 수 있다.
    • JSON 데이터를 저장하고 인덱싱하여 빠른 읽기/쓰기 성능을 제공한다.
    • 단, SQL 쿼리를 지원하지 않아 학습이 필요하다.
    • 트랜잭션을 지원하지 않는다. 단일 문서에 대해 ACID 트랜잭션은 지원하지만 복잡한 트랜잭션은 제한된다.

두 방법 모두 장단점이 있다.

하지만 내가 MongoDB를 채택한 이유는 우선 채팅방이 매우 많아진다는 것을 가정하고 채팅 시스템을 고도화할 계획이었어서 확장과 성능에 유리한 MongoDB를 채택한 이유가 가장 크다.

그리고 MongoDB를 제대로 사용해본 경험이 없기 때문에 MongoDB 사용을 경험해보고자 함도 있었다.

채팅방 생성

우리 시스템에서 채팅방을 생성할 때 중요한 부분이 있었다.

강아지 미용사가 견적서를 보내면서 채팅방이 생기기 시작한다. 즉, 채팅방이 만들어질 때 미용사가 견적서와 함께 일종의 시작 문구를 채팅으로 보내야 했다.

그러기 위해서는 채팅방을 생성하면서 바로 채팅을 Redis에 저장하는 로직이 필요했다.

추후 채팅들을 조회할 때 Redis나 MongoDB에서 조회할 것이기 때문이었다.

나머지 채팅방을 생성하는 부분은 구현하려는 채팅 시스템마다 다를 것이고 다른 채팅과 관련없는 API와 크게 다를게 없어 Redis로 저장하는 부분만 설명한다.

1
2
3
4
5
public void saveMessageRedis(ChatMessageDto message) {
    String key = SAVE_MESSAGE_ROOM_ID_KEY + message.getRoomId();
    redisTemplate.opsForList().rightPush(key, message);
    redisTemplate.expire(key, Duration.ofDays(1));
}
1
2
3
4
chatMessageService.saveMessageRedis(estimateMessage);  
chatMessageService.saveMessageRedis(wantSendImageMessage);  
chatMessageService.saveMessageRedis(wantSendDescriptionMessage);  
chatMessageService.saveMessageRedis(firstMessage);
  • Redis List 자료구조에 메시지를 Push
  • 견적서, 원하는 이미지, 인사 메시지 등의 미용사가 원하는 초기 메시지 설정값이 존재하는데 그 값들을 레디스에 저장하며 채팅방을 생성한다.
  • 크게 눈여겨 볼 로직은 없지만 현재는 Redis 메시지 저장 구조가 List인 점을 기억해두자.

채팅방 조회

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private ChatRoomListDto convertToChatRoomListDto(ChatRoom chatRoom, Role userRole) {
    String lastMessage = chatMessageService.getLastMessage(chatRoom.getId());
    int unreadCount = 0;

    if (userRole.equals(Role.ROLE_USER)) {
        unreadCount = chatMessageService.getUnreadCount(chatRoom.getId(), chatRoom.getCustomer().getId());
    } else if (userRole.equals(Role.ROLE_SALON)) {
        unreadCount = chatMessageService.getUnreadCount(chatRoom.getId(), chatRoom.getGroomer().getId());
    }

    GroomerProfile groomerProfile = groomerProfileRepository.findByUserId(chatRoom.getGroomer().getId())
            .orElseThrow(
                    () -> new IllegalArgumentException("미용사 프로필이 존재하지 않습니다. Id: " + chatRoom.getGroomer().getId()));

    return ChatRoomListDto.builder()
            .roomId(chatRoom.getId())
            .groomerProfile(ChatGroomerProfileDto.create(groomerProfile))
            .customer(ChatCustomerDto.create(chatRoom))
            .lastMessage(lastMessage)
            .unreadCount(unreadCount)
            .totalAmount(chatRoom.getEstimate().getTotalAmount())
            .build();
}
  • 채팅방 조회는 말 그대로 채팅방 목록을 조회한다.
  • 카카오톡과 같은 채팅방을 생각해보면 된다.
  • 채팅방에는 채팅방 제목, 마지막 메시지 내용, 읽지 않은 메시지 수가 보여진다.
  • 여기서 읽지 않은 메시지 수를 어떻게 조회할 지 고민하면 된다.
  • 마지막 메시지 내용은 Redis에서 제일 끝의 메시지를 가져와주면 되는 간단한 로직이면 된다.

읽지 않은 메시지 수를 구하려면 어디까지 읽었는지를 알아야 한다.

어디까지 읽었는지에 대한 부분에 대해 나는 아래 두 경우 업데이트 해주기로 했다.

  • 내가 채팅을 보냈을 때
    • 내가 채팅을 보내는 경우는 마지막 읽은 메시지가 당연히 나의 채팅이다.
  • 채팅방에 입장했을 때
    • 채팅방에 입장했을 때 메시지는 안읽은 부분부터 가장 최근 메시지까지 전부 조회한다.
    • 이전 메시지를 조회하는 것은 스크롤을 통해 조금씩 가져오지만 최신 메시지는 한 번에 조회하도록 구현했다.
    • 그 이유는 보통 안읽은 메시지가 많은 채팅방을 들어갔다가 바로 나오면 안읽은 메시지가 초기화되기 때문이다.
    • 따라서 이 경우 가장 최신 메시지까지 읽은 것으로 업데이트 된다.
1
2
3
4
5
6
7
8
9
public void updateLastReadMessage(Long roomId, Long userId) {
    String lastReadKey = LAST_READ_KEY + roomId + ":" + userId;
    String messageKey = SAVE_MESSAGE_ROOM_ID_KEY + roomId;

    Long listSize = redisTemplate.opsForList().size(messageKey);
    if (listSize != null) {
        redisTemplate.opsForValue().set(lastReadKey, listSize.intValue() - 1);
    }
}

이런 방식으로 마지막 읽은 메시지 키 값에 일종의 Integer 메시지 Index 값을 저장해두고 활용하는 방식을 채택했다.

채팅방 입장

채팅방에 입장하면 읽지 않은 메시지부터 제일 최근 메시지까지 조회하거나, 읽지 않은 메시지가 없다면 제일 최신 메시지부터 N개의 메시지를 조회하는 방식을 택했다.

이 전의 메시지를 조회하기 위해서는 스크롤을 위로 올리면 N개만큼 이전 메시지들을 조회하는 방식이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public List<Object> getMessagesForUnreadOrRecent(Long roomId, Integer lastReadIndex, Long totalMessageCount) {
    String messageKey = SAVE_MESSAGE_ROOM_ID_KEY + roomId;

    if (lastReadIndex != null && totalMessageCount != null) {
        if (lastReadIndex >= totalMessageCount - 1) {
            int startIndex = Math.max(lastReadIndex - MESSAGE_GET_LIMIT + 1, 0);
            return redisTemplate.opsForList().range(messageKey, startIndex, lastReadIndex);
        } else {
            return redisTemplate.opsForList().range(messageKey, lastReadIndex + 1, -1);
        }
    } else {
        return redisTemplate.opsForList().range(messageKey, -MESSAGE_GET_LIMIT, -1);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Object> getPreviousMessages(Long roomId, Long userId) {
    String messageKey = SAVE_MESSAGE_ROOM_ID_KEY + roomId;
    String firstLoadedKey = FIRST_LOADED_KEY + roomId + ":" + userId;

    Integer firstLoadedIndex = (Integer) redisTemplate.opsForValue().get(firstLoadedKey);

    if (firstLoadedIndex == null || firstLoadedIndex <= 0) {
        return List.of();
    }

    int startIndex = Math.max(firstLoadedIndex - MESSAGE_GET_LIMIT, 0);
    int endIndex = firstLoadedIndex - 1;

    List<Object> messages = redisTemplate.opsForList().range(messageKey, startIndex, endIndex);

    redisTemplate.opsForValue().set(firstLoadedKey, startIndex);

    return messages;
}

마찬가지로 Redis의 Key값들을 이용해 List를 조회한다.

이전 메시지를 가져오는 기준은 FirstLoadedKey 값을 세팅해두고 그 인덱스보다 작은 N개의 메시지를 조회하는 방식이다. 당연히 이전 메시지를 한 번 조회할 때 FirstLoadedKey도 N개만큼 줄어들 것이다.

MongoDB 저장/조회

채팅방 나가기 등의 로직이 있지만 크게 다룰 부분이 없기에 MongoDB에 대한 부분을 설명하려고 한다. 둘 다 나갔을 때 채팅방과 관련된 Redis 데이터들을 삭제해주면 된다.

이제 Redis에 있던 내용을 MongoDB에 저장해야 한다.

MongoDB 저장

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Getter
@Document(collection = "chat_messages")
public class ChatMessageMongo {

    @Id
    private String id;

    private Long roomId;
    private Long senderId;
    private String senderRole;
    private String messageText;
    private String imageUrl;
    private LocalDateTime sendAt;

    @Builder
    public ChatMessageMongo(String id, Long roomId, Long senderId, String senderRole, String messageText,
                            String imageUrl,
                            LocalDateTime sendAt) {
        this.id = id;
        this.roomId = roomId;
        this.senderId = senderId;
        this.senderRole = senderRole;
        this.messageText = messageText;
        this.imageUrl = imageUrl;
        this.sendAt = sendAt;
    }

    public static ChatMessageMongo createMessage(ChatMessageDto message) {
        return ChatMessageMongo.builder()
                .id(message.getMessageId())
                .roomId(message.getRoomId())
                .senderId(message.getSenderId())
                .senderRole(message.getSenderRole())
                .messageText(message.getMessageText())
                .imageUrl(message.getImageUrl())
                .sendAt(message.getSendAt())
                .build();
    }
}
  • 우선 MongoDB에 저장하기 위한 Entity를 만들어줘야 한다.
  • 눈여겨볼 부분은 Id가 Long이나 Integer가 아닌 String이라는 점이다.

저장 로직은 사실 문제 없었다.

단순히 스케줄러를 통해 레디스에 있던 메시지들을 ChatMessageDto로 변환하고, 그 Dto를 위의 Entity로 변환해 단순히 저장만해주면 됐다.

MongoRepository를 사용해야 하고, 그런 부분은 조금 달랐지만 나머지는 크게 다르지 않았다.

하지만 문제는 조회에 있다.

MongoDB에서 메시지 조회

우리는 Redis에 List 형식으로 Value값에 Integer의 일종의 메시지 Index를 저장했다.

그런데 Redis에 있는 메시지들을 MongoDB에 저장하면 Redis의 Index가 초기화된다.

물론 MongoDB의 데이터들은 아래와 같이 조회할 수 있다.

1
2
3
4
5
6
public List<ChatMessageDto> getUnreadMessages(Long roomId, String lastReadMessageId) {
    return chatMessageRepository.findByRoomIdAndIdGreaterThanOrderBySendAtAsc(roomId, lastReadMessageId)
            .stream()
            .map(ChatMessageDto::mongoMessageToDto)
            .toList();
}

MongoRepository도 JPA처럼 메서드 명으로 쿼리 생성을 알아서 지원해주는 기능이 있다.

따라서 SendAt을 기준으로 읽지 않은 메시지들을 조회하는 등의 조회를 하면 됐다.

하지만 문제는 MongoDB에 옛날 메시지들을 저장해둔 이후이다.

채팅을 또 다시 입력했을 때 Redis에 저장되는 메시지들을 먼저 조회하고 Redis의 메시지들을 조회했을 때 값이 없으면 그때부터 MongoDB의 메시지들을 조회해야 한다.

그런데 lastReadKey나 firstLoadedKey 등의 값은 Integer이고, 그 Integer 값은 Redis에 기존에 저장했던 메시지의 Index가 기준이다.

근데 MongoDB에 저장한 이후로는 Redis에 저장한 메시지들의 Index가 초기화되어 일종의 데이터 불일치가 발생한다.

MongoDB의 ID값을 활용해보고자 UUID 생성을 시간순서로 구분짓게 해줄 수 있는 라이브러리도 사용해봤지만 의미없었다.

ID, Index 구조를 모두 바꾸기에는 너무 광범위해서 이 구조를 유지하면서 문제를 해결해보고자 했다.

하지만 MongoDB에 저장하고 Redis에 새로운 메시지가 추가되는 순간 기존 로직으로는 도저히 해결이 안됐다. 이 부분에서 구현에 시간을 많이 쏟은 것 같다.

메시지 조회를 위한 구조 개선

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Getter
@Document(collection = "chat_messages")
public class ChatMessageMongo {

    @Id
    private String id;

    private Long sequence;
    private Long roomId;
    private Long senderId;
    private String senderRole;
    private String messageText;
    private String imageUrl;
    private LocalDateTime sendAt;
	//...
}

sequence라는 Long 타입의 값을 추가했다.

Id나 Index 값을 대체하는 용도이다.

이제 읽지 않은 메시지, Redis 메시지 조회, MongoDB 메시지 조회 모두 이 Sequence를 기준으로 조회를 하면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Component
@RequiredArgsConstructor
public class ChatRedisUtil {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ChatRedisConfig redisConfig;

    public void saveMessage(ChatMessageDto message) {
        String key = redisConfig.getSaveMessageKey(message.getRoomId());

        redisTemplate.opsForZSet().add(key, message, message.getSequence());
        redisTemplate.expire(key, TTL);
    }

    public Long getNextSequence(Long roomId) {
        String sequenceKey = redisConfig.getSequenceKey(roomId);
        return redisTemplate.opsForValue().increment(sequenceKey);
    }

    public Long getCurrentSequence(Long roomId) {
        String sequenceKey = redisConfig.getSequenceKey(roomId);
        Object sequence = redisTemplate.opsForValue().get(sequenceKey);

        if (sequence == null) {
            return 0L;
        }

        return Long.parseLong(sequence.toString());
    }

    public List<Object> getMessagesBySequence(Long roomId, Long startSequence, Long endSequence) {
        String key = redisConfig.getSaveMessageKey(roomId);
        return Objects.requireNonNull(redisTemplate.opsForZSet()
                        .rangeByScore(key, startSequence, endSequence))
                .stream()
                .toList();
    }

    public Long getLastReadSequence(Long roomId, Long userId) {
        String key = redisConfig.getLastReadKey(roomId, userId);
        Object sequence = redisTemplate.opsForValue().get(key);
        return sequence != null ? Long.parseLong(sequence.toString()) : 0L;
    }

    public Long getFirstLoadedSequence(Long roomId, Long userId) {
        String key = redisConfig.getFirstLoadedKey(roomId, userId);
        Object sequence = redisTemplate.opsForValue().get(key);
        return sequence != null ? Long.parseLong(sequence.toString()) : 0L;
    }

    public void updateFirstLoadedSequence(Long roomId, Long userId, Long sequence) {
        String key = redisConfig.getFirstLoadedKey(roomId, userId);
        redisTemplate.opsForValue().set(key, sequence);
    }

    public List<Object> getAllMessagesFromRoom(String roomKey) {
        return Objects.requireNonNull(redisTemplate.opsForZSet()
                        .rangeByScore(roomKey, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY))
                .stream()
                .toList();
    }

    public void deleteRoomData(Long roomId) {
        String key = redisConfig.getSaveMessageKey(roomId);
        redisTemplate.delete(key);
    }

    public void updateCurrentSequence(Long roomId, Long sequence) {
        String sequenceKey = redisConfig.getSequenceKey(roomId);
        redisTemplate.opsForValue().set(sequenceKey, sequence);
    }

    public void updateLastReadSequence(ChatMessageDto message) {
        String lastReadKey = redisConfig.getLastReadKey(message.getRoomId(), message.getSenderId());

        redisTemplate.opsForValue().set(lastReadKey, message.getSequence());
    }
}

일부 Redis 관련 메서드들이다.

  • 이제 메시지를 저장할 때 List가 아닌 ZSet에 저장한다.
    • Score는 Sequence 기준이다.
  • 모든 키 값의 업데이트, 조회에 Sequence가 기준이 된다.
  • 다음 시퀀스를 가져오기 위한 부분은 increment()를 사용한다.

MongoDB 시퀀스 사용

1
2
3
4
5
6
7
public List<ChatMessageDto> getMessagesFromSequence(Long roomId, Long lastReadSequence, int limit) {
    return chatMessageRepository.findByRoomIdAndSequenceGreaterThanOrderBySequenceAsc(roomId, lastReadSequence)
            .stream()
            .map(ChatMessageDto::mongoMessageToDto)
            .limit(limit)
            .toList();
}
  • 마찬가지로 이제 ChatMessageDto, ChatMessageMongo에 전부 Sequence 값이 존재하기 때문에 그 값을 기준으로 메시지를 조회할 수 있다.
  • MongoDB의 String Id값과 Redis의 Integer Index 간의 차이에서 발생한 조회 문제가 비교적 간단히 해결되었다.

다만 조금 어려웠던 부분이 있었다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<ChatMessageDto> getMessagesBeforeSequence(Long roomId, Long firstLoadedSequence, int limit) {
    int startSequence = calculateStartSequence(firstLoadedSequence, limit);

    Query query = new Query()
            .addCriteria(
                    Criteria.where("roomId").is(roomId).and("sequence").gte(startSequence).lt(firstLoadedSequence))
            .with(Sort.by(Sort.Direction.ASC, "sequence"))
            .limit(limit);

    List<ChatMessageMongo> messages = mongoTemplate.find(query, ChatMessageMongo.class);

    if (messages.isEmpty()) {
        return List.of();
    }

    return messages.stream()
            .map(ChatMessageDto::mongoMessageToDto)
            .toList();
}
  • 이전 메시지를 가져오는 로직이다.
  • 단순히 마지막 읽은 시퀀스를 제공하고 그 시퀀스를 기준으로 조회를 하면 쉽겠다 싶었다.
  • 하지만 이 부분에 페이지네이션을 적용하면 시퀀스를 기준으로 했을 때 페이지가 제대로 계산이 되지 않았다.
    • 그렇다고 페이지를 고정된 값으로 넣으면 시퀀스를 기준으로 그 페이지에 해당하는 부분만 무한정 조회할 뿐이었다.
  • 페이지네이션으로 이 방법 저 방법 시도해보다가 도저히 해결이 되지 않아 mongoTemplate을 사용하게 되었다.
  • 조회를 할 메시지 전체 시퀀스 범위를 제공하는 것이다.
  • skip과 limit을 사용하는 방법도 있다.
    • 하지만 이 방법은 MongoDB의 동작 방식에서 문제가 생길 여지가 있어 채택하지 않았다.
    • skip 값 만큼의 문서를 건너뛰기 위해 모든 문서를 순차적으로 스캔한다.
    • 데이터가 많을 수록 skip 값이 커지고 그만큼 스캔하는 데이터도 많아 조회 속도가 느려질 수 있다는 문제가 있다.

결과적으로 수많은 시행착오를 거치고 나서야 Redis + MongoDB 저장/조회 로직을 모두 작성할 수 있었다.

성능 테스트 (WebSocket)

우리 서비스에 필요한 채팅에 관련된 기능을 모두 구현했다.

STOMP 프로토콜로의 전환을 도입할 계획이었는데, 그 전에 WebSocket 기반 구현과 성능 차이가 있을까 싶어서 제이미터에서 WebSocket Sampler를 설치 후 테스트 해보았다.

V1

  • users 100
  • seconds 10
  • roop 50

매우 적은 요청임에도 많은 시간이 걸려 의아했다. CPU 사용량, 쓰레드, 메모리 관련 모니터링 지표에서는 아무런 문제가 없어서 더 의아했다.

V2

동시에 500명의 사용자가 연결해 메시지를 전송한다.

30초 동안 500명의 사용자가 순차적으로 연결된다. 초당 16.67명 연결

각 사용자가 메시지 전송을 50번 반복한다.

  • users 500
  • seconds 30
  • roop 50

수를 늘려도 크게 차이가 없었다.

V3

  • users 10,000
  • seconds 60
  • roop 50

CPU 사용량에도 차이가 있었고 Error도 나타났다.

V4

기존의 설정은 WebSocket Sampler에서 Streaming Connetion이 꺼져있었다.

연결을 지속적으로 유지하지 않아 기본적으로 WebSocket 샘플러가 테스트가 끝난 후 자동으로 연결을 끊기 때문에 20000ms의 시간이 기본적으로 발생한 것이라고 추측했다.

그러나 실상은 아래와 같다.

WebSocket Sampler에서 Response Timeout 값을 20000으로 설정했었는데 이는 JMeter가 서버 응답을 대기하는 시간이다.

WebSocket은 요청-응답의 패턴이 아니기때문에 Timeout값이 불필요한 대기시간으로 설정된 것이다. 이를 0으로 바꾸고 테스트 해봤다.

  • users: 200
  • seconds: 10
  • roop: 50

아주 쉽게 수많은 요청을 처리하는 것을 볼 수 있다. 이게 실제 연결 및 채팅 보내기에 대한 테스트 응답 시간이다.

이제 부하를 늘려보자.

V5

users: 3000 seconds: 10 roop: 50

웹 소켓 초기 연결 시 문제가 발생했다. (Error 2.34%)

초당 1323개의 메시지를 처리했고 평균 지연시간은 15ms이다.

CPU 사용량에는 문제가 없었지만 웹 소켓에 초기 수많은 동시 연결 시 문제가 잠깐 발생했다.

아마 세션 관리를 ConcurrentHashMap을 직접 사용하면서 생기는 문제일 수 있을 것 같다는 생각을 했다.

우선 이 정도로 테스트를 마치고 STOMP 도입 후 성능 변화를 체크해볼 예정이다.

정리

우선 WebSocket을 통해 기본적으로 채팅 시스템을 어떻게 구현하는지 알아보았다.

WebSocket에 대한 구현보다 Redis, MongoDB를 사용하는 부분에 있어 더 고생을 한 것 같다.

WebSocket에 대한 부분은 자료도 많고 구현해야 할 내용이 어느정도 정해져있어 크게 어렵지 않았지만 Redis, MongoDB에서 조회하는 부분이 어려웠다.

MongoDB는 조회 자체가 익숙하지 않은 방식으로 이루어져서 더 어려웠고, Redis와의 Index 타입 차이에서 오는 조회 어려움도 컸다.

해결법은 비교적 간단하긴 했지만 좋은 경험이었다고 생각한다.

다음 포스트에서는 WebSocket을 사용했던 것을 STOMP 프로토콜로 전환하는 것을 정리한다.

This post is licensed under CC BY 4.0 by the author.

소셜로그인 구현 (구글, 네이버)

채팅 시스템 프로토콜 전환 (STOMP) + 겪었던 문제