STOMP
기존에 작성했던 채팅 로직을 그대로 프로토콜만 변경해보고자 한다.
그 전에 STOMP에 대해 알아보자. 기존에 구현했던 WebSocket을 사용했을 때와 어떤 차이가 있는지 보자.
- WebSocket
- WebSocketHandler를 통해 수동 처리중
- 특정 사용자가 보낸 메시지를 특정 방이나 사용자에게 전달하기 위해 직접 로직을 구현했다.
- 클라이언트와 서버 간 연결 관리, 메시지 브로드캐스트 등 모든 기능을 직접 구현한다.
- 세션관리
- ConcurrentHashMap을 사용해 세션을 roomId와 매핑해 관리했다.
- 연결된 사용자의 세션을 직접 추적하며 브로드캐스트나 개인 메시지를 전송했다.
- 메시지 전송, 연결 종료 시 로직을 수동으로 추가했다.
이런 단점들을 보완한 STOMP 프로토콜로의 전환을 해보자.
- STOMP
- 우선 내장 브로커를 사용한다. 추후 카프카, Redis Pub/Sub 등을 활용해 메시지 브로커를 활용할 수 있다.
- 구독 및 메시지 전달을 간단히 구현한다.
- 서버는 메시지 브로커를 통해 구독한 클라이언트에 메시지를 자동으로 브로드캐스트 한다.
- 개발자는 구독/메시지 전송 처리에만 집중한다.
- 세션 관리 자동화
- WebSocket 세션을 내부적으로 관리해준다.
- 특정 사용자가 특정 채널을 구독하면 메시지가 자동으로 전달된다.
핵심은 클라이언트가 특정 주제를 구독하고, 그 채널로 브로커에서 발행된 메시지를 수신하는 구조라는 점이다.
메시지 브로커를 쉽게 도입할 수 있는데 이를 통해 확장이 쉬워진다. 메시지를 브로커에 보내기만 하면 수신자는 이를 나중에 처리해줄 수 있다. 거기에 고성능과 높은 처리량을 보장해주고, 메시지 재전송이나 장애 복구도 가능하기 때문에 이점을 가진다.
더해 세션 또한 내부적으로 효율적인 자료구조, 알고리즘을 사용해주기 때문에 직접 관리했을 때 보다 더 나은 성능이 있을 것이라고 기대한다.
STOMP 도입
일단 기존에 구현했던 ChatSocketHandler, WebSocketConfig 클래스들은 필요하지 않다.
그를 대체할 클래스들이 있다.
StompConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompConfig implements WebSocketMessageBrokerConfigurer {
private final JwtAuthInterceptor jwtAuthenticationInterceptor;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//내장 메시지 브로커 사용
registry.enableSimpleBroker("/sub"); //클라이언트가 구독할 경로
registry.setApplicationDestinationPrefixes("/pub"); // 클라이언트가 메시지를 보낼 경로
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/chat")
.setAllowedOrigins("http://localhost:8080", "http://localhost:5173")
.addInterceptors(new WebSocketHandshakeInterceptor())
.withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(jwtAuthenticationInterceptor);
}
}
- 메시지 브로커를 설정하고 엔드포인트를 등록한다.
- 그리고 인터셉터를 등록할 수 있는 설정 클래스이다.
- configureMessageBroker()
- enableSimpleBroker(“/sub”)
- 클라이언트가 구독할 메시지 경로를 설정한다.
- /sub 경로로 시작하는 메시지는 SimpleBroker가 처리한다.
- 이 브로커는 기본적으로 메모리에 저장하고 나중에 다른 메시지 브로커로 전환이 가능하다.
- setApplicationDestinationPrefixes(“/pub”)
- 클라이언트가 메시지를 서버로 보낼 때 사용할 경로이다.
- /pub으로 시작하는 메시지는 애플리케이션 내 핸들러의 @MessageMapping으로 라우팅 된다.
- enableSimpleBroker(“/sub”)
ChatController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final ChatService chatService;
@MessageMapping("/chat/send/{roomId}") //클라이언트가 이 경로로 메시지를 전송한다.
@SendTo("/sub/chat/{roomId}") // 메시지 브로커가 해당 roomId를 구독한 클라이언트로 메시지 브로드 캐스트
public ChatMessageDto sendMessage(@DestinationVariable Long roomId, @Payload ChatMessageDto message,
SimpMessageHeaderAccessor headerAccessor) {
log.info("message=" + message);
Authentication auth = (Authentication) headerAccessor.getUser();
CustomOAuth2User user = (CustomOAuth2User) auth.getPrincipal();
Long senderId = user.getUserId();
String senderRole = user.getRole();
return chatService.createAndSaveMessage(message, roomId, senderId, senderRole);
}
}
- 클라이언트가 특정 경로로 전송한 메시지를 처리한다.
- /chat/send/roomId
- 처리된 메시지를 특정 주제를 구독한 모든 클라이언트에 브로드캐스트 한다.
- /sub/chat/roomId
- 이제 WebSocketHandler에서 메시지를 보낼 때 처리했던 부분을 이 부분에서 똑같이 동작하도록 만들어주면 된다.
매개변수의 SimpMessageHeaderAccessor headerAccessor 이 부분이 사실 @AutenticationPrincipal을 이용해 user의 정보를 가져오려 했던 부분인데, 이후 설명하겠지만 WebSocket 연결에서 해당 정보를 가져오는데 문제들이 발생해 위와 같은 방법을 채택하게 되었다.
1
2
3
4
stompClient.subscribe(`/sub/chat/${roomId}`, (message) => {
const chatMessage = JSON.parse(message.body);
displayMessage(chatMessage);
});
- 클라이언트 측에서는 예를 들면 채팅방에 입장하면서 웹소켓과 연결하고, 위와 같이 구독할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
function sendMessage() {
const messageText = document.getElementById("message-input").value;
const message = {
roomId: roomId,
messageText: messageText,
sendAt: new Date().toISOString(),
};
stompClient.send(`/pub/chat/send/${roomId}`, {}, JSON.stringify(message));
document.getElementById("message-input").value = "";
}
- 클라이언트는 위와 같은 방식으로 메시지를 보낼 수 있다. 그렇게 되면 위에서 구독한 모두에게 메시지를 전달하게 된다.
JwtAuthInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Slf4j
@Component
@RequiredArgsConstructor
public class JwtAuthInterceptor implements ChannelInterceptor {
private final JwtUtil jwtUtil;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String cookieHeader = (String) Objects.requireNonNull(accessor.getSessionAttributes()).get("cookie");
String token = extractTokenFromCookie(cookieHeader);
if (token != null && !jwtUtil.isExpired(token)) {
Long userId = jwtUtil.getUserId(token);
String username = jwtUtil.getUsername(token);
String role = jwtUtil.getRole(token);
log.info("userId: {}, username: {}, role: {}", userId, username, role);
UserDto userDto = UserDto.builder()
.userId(userId)
.username(username)
.role(role)
.build();
CustomOAuth2User customOAuth2User = new CustomOAuth2User(userDto);
Authentication auth = new UsernamePasswordAuthenticationToken(customOAuth2User, null,
customOAuth2User.getAuthorities());
SecurityContextHolder.getContext().setAuthentication(auth);
accessor.setUser(auth);
} else {
throw new TokenExpiredException();
}
}
return message;
}
private String extractTokenFromCookie(String cookieHeader) {
return Arrays.stream(cookieHeader.split(";"))
.map(String::trim)
.filter(cookie -> cookie.startsWith("Authorization="))
.map(cookie -> cookie.substring("Authorization=".length()))
.findFirst()
.orElse(null);
}
}
- 이미 HTTP 요청에 쓰이는 JWTFilter가 있는데 이 클래스는 왜 필요한가 싶을 수 있다.
- 이 클래스는 STOMP 기반 WebSocket 통신에서 클라이언트가 서버에 연결할 때 JWT 인증을 처리하기 위함이다.
- STOMP 프로토콜에서는 HTTP 요청과 같이 JwtFilter가 동작하지 않는다.
- 따라서 WebSocket 통신에서도 JWT 인증을 사용하기 위해 이 클래스를 만들었다.
- 메시지가 채널로 전달되기 전 메시지를 검사하거나 수정할 수 있다.
그런데 우리 서비스에서 인증 토큰은 쿠키에 담겨있다.
그래서 웹 소켓 연결 시 쿠키를 포함하는 요청을 보내도록 클라이언트 코드를 작성했다.
그런데 이 인터셉터에서 쿠키를 가져올 수 없는 문제가 있었다.
이유는 아래와 같다.
- WebSocket 연결은 일반 HTTP 요청으로 시작되고 핸드셰이크 과정에서 연결이 업그레이드 된다.
- 이 때 클라이언트는 서버로 HTTP 요청 헤더와 쿠키를 보내는데 STOMP 프로토콜에서는 연결이 완료되고 HTTP 헤더나 쿠키 정보를 포함하지 않는다.
- 그런데 우리는 핸드셰이크 단계에서 정보를 받아 사용해야 한다.
- 따라서 HTTP 요청에서 가져온 정보를 attributes에 저장해 이후 WebSocket 세션에도 사용할 수 있도록 하는 구조가 필요하다.
위와 같은 이유때문에 Interceptor를 하나 더 구현해야 했다.
WebSocketHandshakeInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
String cookieHeader = request.getHeaders().getFirst("Cookie");
if (cookieHeader != null) {
attributes.put("cookie", cookieHeader);
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
}
- HTTP 요청에서 Cookie 값을 가져와 STOMP 세션의 attributes에 저장한다.
- 따라서 JwtAuthInterceptor에서 이 속성을 참조할 수 있게 된다.
- 위의 코드에서 볼 수 있다시피 StompConfig에서 이 인터셉터를 추가하면 된다.
JwtAuthInterceptor 장애
위와 같이 구현했을 때 문제가 발생하는 경우가 있다.
토큰을 추출해 인증 객체를 저장하는데 그 과정에서 아래와 같은 오류를 만날 수 있다.
1
2
3
o.s.w.s.m.StompSubProtocolHandler : Error publishing SessionConnectedEvent ...
java.lang.NullPointerException: Cannot invoke "Object.hashCode()" because "key" is null at java.base/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) ~[na:na] at
정리된 글이 별로 없어서 문제를 쉽게 찾지 못했다.
문제는 인증 객체에서 getName()이 null을 반환해 STOMP 세션을 관리하는 ConcurrentHashMap의 Key값이 유효하지 못했던 것이 문제이다.
- Spring에서는 STOMP 연결 요청 시 각 클라이언트의 세션 정보를 저장한다.
- 이 때 세션과 관련된 사용자 정보를 ConcurrentHashMap을 사용해 관리한다.
- 그런데 이 때 키 값을 simpUser 또는 Principal을 키로 사용하게 된다.
- 이 때 getName() 메서드나 hashCode()를 통해 고유한 해시값을 생성해 사용하게 된다.
- 그런데 나는 토큰에 username 정보만 담겨있고 name 정보는 담겨있지 않아 getName() 메서드가 null을 반환했기 때문에 계속 저 문제가 발생했다.
이 에러를 고치기 힘들었던 이유는 우선 웹 소켓 연결이 문제 없이 잘 되었고 메시지를 보낼 때 컨트롤러에 도달하지도 못하고 에러 로그도 저것뿐이었다는 것이다.
수 많은 시행착오 끝에 CustomOAuth2User 객체의 getName() 메서드를 username을 반환하는 것으로 수정했더니 문제는 해결됐다.
AutenticationPrincipal 문제
위의 인터셉터에서 인증 객체를 저장했기 때문에 애노테이션을 사용해 문제없이 사용자 정보를 가져올 수 있을 것이라고 생각했다.
@AuthenticationPrincipal을 사용하고자 했다.
그런데 아래와 같은 에러가 발생한다.
1
2
3
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of com.dangdangsalon.domain.auth.dto.CustomOAuth2User (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION disabled); line: 1, column: 2]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:256) ~[spring-messaging-6.1.14.jar:6.1.14]
CustomOAuth2User에 대한 직렬화 문제이다.
그런데 계속 값을 바꾸다보니 RoomID에 대한 부분까지 저 부분에서 에러가 발생했다. RoomID는 분명 ChatMessageDto에 있는 값인데 왜 그런 에러가 발생하는지 의아했다.
알아본 결과로는 아래와 같다.
- Spring의 메시지 컨버터는 메시지를 처리할 때 기본적으로 Jackson을 사용한다.
- 그런데 Jackson은 CustomOAuth2User와 같은 객체를 적절하게 직렬화/역직렬화 할 수 없다.
- Spring 메시지 컨버터는
@DestinationVariable
과@Payload
를 함께 사용할 때, 각각을 분리해서 처리하지 않고 전체 메시지를 단일 JSON으로 처리하려고 한다. - 이로 인해 메시지 본문의 모든 필드를
CustomOAuth2User
로 역직렬화하려 했지만,roomId
와 같은 필드는CustomOAuth2User
에 없는 필드이므로 Jackson에서 오류가 발생할 수 있다.
이러한 이유로 이 부분에서도 수 많은 시행착오를 거친 후 SimpMessageHeaderAccessor
를 사용하여 사용자 정보와 기타 메타데이터를 헤더에서 직접 가져와 문제를 해결하게 되었다.
메시지 본문(@Payload
)과 사용자 인증 정보(headerAccessor
)를 별도로 처리하여 충돌을 방지할 수 있었다.
성능 테스트 (STOMP)
여러 시행착오가 있어 시간은 꽤 많이 걸렸지만 STOMP 프로토콜로의 전환은 이게 다이다. 나머지 Redis나 MongoDB를 활용하는 로직은 HTTP 요청이기 때문에 그대로 사용 가능하다.
기존에 WebSocket 방법으로 사용했을 때 보다 구현이 간단하다.
거기에 Interceptor를 활용하며 더 많은 기능들을 활용할 수 있다.
이제 여기에 메시지 브로커로 카프카를 도입해보려고 한다. 이 부분은 다음 포스트에서 다루고 일단 STOMP 프로토콜만 도입했을 때 성능에 차이가 있는지 확인해보려고 한다.
https://velog.io/@mw310/Spring-Boot-Jmeter%EB%A1%9C-STOMP-%EB%B6%80%ED%95%98-%ED%85%8C%EC%8A%A4%ED%8A%B8-%ED%95%98%EA%B8%B0
테스트 시나리오 설정은 위 블로그 글을 참고해 진행했다.
일반적으로 평균 100ms 이하, 혹은 최대 500ms 이하를 목표로 한다.
V1
- users: 100
- seconds: 0
- loop: 50
테스트 시나리오가 WebSocket과 조금 다르지만 비교에는 문제가 없을 것 같다. 요청이 구분되어있다는 점을 제외하고는 웹소켓에 연결하고 메시지를 보내는 요청까지는 동일하므로 문제 없을 것 같다.
일단 지금 세팅 값으로는 전혀 문제 없었다. 수치를 올려보자.
V2
- users: 500
- seconds: 10
- loop: 50
연결에 조금 시간이 걸리는 것 같다.
V3
- users: 1000
- seconds: 30
- loop: 50
성능이 썩 좋아지는 것을 느끼지 못한다.
V4
- users: 3000
- seconds: 30
- loop: 50
성능이 확실히 두드러진다. STOMP만을 도입한 경우 WebSocket으로 구현했을 경우보다 성능이 안좋다.
왜 성능이 나쁠까
- STOMP 프로토콜의 메시지 구조
- STOMP 프로토콜은 메시지의 헤더와 바디를 포함한 텍스트 기반 프로토콜이다.
- JSON 데이터만을 전송하는 WebSocket 방식보다 데이터 크기가 크다.
- 이 부분에서 오버헤드가 발생할 수 있다.
- 내장 브로커 사용
- 현재 Spring 내장 브로커를 사용하고 있다.
- 메시지를 메모리에 저장하고 메시지가 많아질수록 성능이 급격히 저하될 수 있다.
- Simple Broker는 메시지 처리에 비동기 메시징 큐 (Kafka 등)처럼 병렬 처리를 지원하지 않아 처리량이 낮아질 수 있다.
- 더해 관리 기능이 부족해 연결이 많아지면 과부하가 발생할 수 있다.
- 라우팅 및 구독 관리
- 각 클라이언트의 구독 상태를 관리하고 메시지 라우팅도 처리하기 때문에 부하가 더 발생할 수 있다.
따라서 성능 향상을 위해 아래와 같은 방법을 적용해볼 수 있다.
- 외부 메시지 브로커 사용
- 카프카, RabbitMQ, Redis 등 사용 가능
- 커넥션 풀 관리
- WebSocket 세션 수를 제한하고 클라이언트를 분산시킬 수 있다.
나는 카프카를 도입해보려고 한다.
정리
STOMP 프로토콜을 적용했을 때 오히려 성능이 떨어지는 것을 확인할 수 있었다.
우리 채팅 시스템이 비교적 간단한 것도 이유가 될 수 있고, 당장 STOMP의 큰 장점인 외부 메시지 브로커를 활용하지 않았다는 점이 문제가 됐을 수 있다.
이제 다음 포스트에서 Kafka를 도입해 STOMP와 결합해보고 성능을 다시 테스트해본다.