Home 쿠폰 이벤트 구현 (실시간 대기열 보여주기 - SSE)
Post
Cancel

쿠폰 이벤트 구현 (실시간 대기열 보여주기 - SSE)

이번 프로젝트에서 쿠폰 발급 이벤트를 구현하기로 했다. 결제 시스템이 존재했고, 해당 결제 시스템에 쿠폰을 적용하는 것을 구현하고 싶었기 때문이다.

이전 프로젝트에서 비슷한 기능을 구현했었기 때문에 원래 내 담당은 아니었지만 시간이 촉박해서 도와주게 되었다.

이번에는 배포까지 하기 때문에 운영환경에서의 부하도 고려해야 했고 그 때문에 이전 포스트에서 AWS 부하테스트를 진행했다.

이번 쿠폰 이벤트의 관건은 아래와 같다.

  • 강아지 미용 중개서비스이다 보니 사용자는 쿠폰을 발급받자마자 사용할 수 있으면 더 좋을 것이다.
    • 따라서 즉각적으로 쿠폰 발급이 이루어지고 RDB에 쿠폰이 저장되어야 한다.
    • 이전 프로젝트에서 대용량 트래픽을 받기 위해 Redis에 값을 저장해두고 트래픽이 몰리지 않는 시점에 스케줄러로 RDB에 저장했었던 것과 다르다.
  • 부하가 몰렸을 때 RDB에 저장하는 로직이 즉각적으로 수행되려면 트래픽을 순서대로 해소해줘야 한다. 이를 위해 대기열을 만들기로 결정했고, 유저는 실시간으로 대기열의 상태를 전달받는 방식을 채택했다.

즉, 쿠폰 발급 버튼을 눌렀을 때 쿠폰이 바로 발급되어야 하고 대학교 수강신청처럼 트래픽이 몰렸을 경우 ‘앞에 N명이 남았습니다.’ 같은 문구를 보여주는 실시간 대기열 상태를 보여주는 팝업을 띄워주는 것이 구현 목표이다.

구현 방안

우선 대기열을 위한 로직은 Redis를 활용하기로 했다.

Redis의 ZSet 자료구조를 활용해 대기열을 관리한다. 대기열에 이미 존재하는 사용자거나, 정해둔 대기열 사이즈를 초과할 경우, 쿠폰이 소진된 경우 대기열에 진입할 수 없도록 관리하면 된다.

이전에 비슷한 기능을 구현해본 경험이 있어 이 부분은 큰 문제가 되지 않았다.

다만 실시간 대기열을 구현하는 방식에 대해 고려할 부분들이 있었다.

HTTP Polling 방식, SSE (Server Sent Event), WebSocket 방식을 생각해볼 수 있었고 이 중에 선택해야 했다.

  • HTTP Polling
    • 일정 간격으로 클라이언트가 서버에 요청해 상태를 업데이트하는 방식이다.
    • 구현이 매우 간단하다. 하지만 트래픽이 많을 경우 서버 부하가 너무 심하다는게 문제였다.
      • 불필요한 요청이 많은 방식이기 때문에 어느정도의 트래픽을 견뎌야하는 이런 기능에는 활용하기 힘들 것 같다고 판단해 제외했다.
  • SSE (Server Sent Event)
    • 간단히 설명하면 서버가 클라이언트로 데이터를 푸시하는 방식이다.
    • 단방향 통신이며 HTTP 기반으로 동작한다.
  • WebSocket
    • 채팅 시스템에서 썼던 방식으로 양방향 통신이다.
    • 그러나 채팅 시스템에서 이미 WebSocket 연결에 대한 부하가 꽤 크다는 것을 인지하고 있었고, 마찬가지로 높은 트래픽을 받아야하는 상황에 이러한 부담이 문제가 될 것 같아 제외했다.

위와 같은 이유로 SSE를 사용하기로 결정했다.

우선 대기열에는 양방향 통신이 필요하지 않았고 SSE도 연결 유지 비용이 필요하지만 WebSocket 연결 비용보다 낮기 때문이다.

구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@Service
@RequiredArgsConstructor
@Slf4j
public class CouponIssueService {

    private final RedisTemplate<String, Object> redisTemplate;
    private final RedisPublisher redisPublisher;
    private final CouponRepository couponRepository;
    private final CouponEventRepository couponEventRepository;
    private final UserRepository userRepository;

    private static final int MAX_QUEUE_SIZE = 1000;

    public String joinQueue(Long userId, Long eventId) {
        if (!isEventStarted(eventId)) {
            return "이벤트가 아직 시작되지 않았습니다.";
        }

        String issuedUsersKey = "issued_users:" + eventId;
        String queueKey = "coupon_queue:" + eventId;
        String couponRemainingKey = "coupon_remaining:" + eventId;

        // 남은 쿠폰 확인
        Integer remainingCoupons = (Integer) redisTemplate.opsForValue().get(couponRemainingKey);
        if (remainingCoupons == null || remainingCoupons <= 0) {
            return "쿠폰이 모두 소진되었습니다.";
        }

        Long queueLength = redisTemplate.opsForZSet().zCard(queueKey);
        if (queueLength >= MAX_QUEUE_SIZE) {
            return "쿠폰이 소진되었습니다. 다음 기회에 도전해주세요";
        }

        // 중복 체크: 이미 발급된 사용자
        Boolean isAlreadyIssued = redisTemplate.opsForSet().isMember(issuedUsersKey, userId.toString());
        if (isAlreadyIssued != null && isAlreadyIssued) {
            return "이미 쿠폰을 발급받았습니다.";
        }

        // 중복 체크: 이미 대기열에 있는 사용자
        Boolean isAlreadyInQueue = redisTemplate.opsForZSet().rank(queueKey, userId.toString()) != null;
        if (isAlreadyInQueue) {
            return "이미 대기열에 참여한 사용자입니다.";
        }

        redisTemplate.opsForZSet().add(queueKey, userId.toString(), System.currentTimeMillis());

        if (redisPublisher.isEmitterRegistered(userId)) {
            publishQueueStatus(eventId);
        } else {
            log.info("SSE 연결되지 않음, 대기열 상태 전송 생략: userId={}, eventId={}", userId, eventId);
        }

        return "대기열에 참여했습니다.";
    }

    public void publishQueueStatus(Long eventId) {
        String queueKey = "coupon_queue:" + eventId;
        String couponRemainingKey = "coupon_remaining:" + eventId;

        Long totalQueueLength = redisTemplate.opsForZSet().zCard(queueKey);
        Integer remainingCoupons = (Integer) redisTemplate.opsForValue().get(couponRemainingKey);

        if (totalQueueLength == null || remainingCoupons == null) {
            log.warn("대기열 데이터가 없습니다: eventId={}", eventId);
            return;
        }

        Set<Object> allUsers = redisTemplate.opsForZSet().range(queueKey, 0, -1);
        if (allUsers == null || allUsers.isEmpty()) {
            log.warn("대기열에 사용자가 없습니다: eventId={}", eventId);
            return;
        }

        // 모든 사용자에게 상태를 전송
        for (Object userIdObj : allUsers) {
            Long userId = Long.valueOf(userIdObj.toString());
            Long rank = redisTemplate.opsForZSet().rank(queueKey, userId.toString());
            if (rank == null) continue;

            int aheadCount = rank.intValue(); // 사용자 앞에 있는 사람 수
            int behindCount = totalQueueLength.intValue() - aheadCount - 1;

            // 예상 대기 시간 (초당 10명 처리 가정)
            int processingTimePerUser = 2; // 한 사용자 처리에 걸리는 평균 시간 (초)
            int estimatedTime = aheadCount * processingTimePerUser;

            // 사용자별로 데이터 생성
            QueueStatusDto queueStatus = QueueStatusDto.builder()
                    .eventId(eventId)
                    .queueLength(totalQueueLength.intValue())
                    .remainingCoupons(remainingCoupons)
                    .aheadCount(aheadCount)
                    .behindCount(behindCount)
                    .estimatedTime(estimatedTime)
                    .build();

            // 사용자별로 데이터 전송
            redisPublisher.sendToEmitter(userId, queueStatus, "queue_status");
            log.info("queueStatus for userId {}: ahead={}, behind={}, estimatedTime={}s", userId, aheadCount, behindCount, estimatedTime);
        }
    }

    public SseEmitter subscribeQueueUpdates(Long userId, Long eventId) {
        /*
         클라이언트에서 /queue/updates로 요청을 보내면 SseEmitter 객체를 생성해 SSE 연결을 만든다.
         SseEmitter는 HTTP 연결을 끊지 않고 유지하며 데이터를 스트리밍으로 전달.
         redisPublisher.registerEmitter를 통해 Pub/Sub 메시지를 받아 SseEmitter로 전달한다.
         따라서 실시간으로 대기열 상태를 수신할 수 있다.
         */

        String queueKey = "coupon_queue:" + eventId;
        Boolean isAlreadyInQueue = redisTemplate.opsForZSet().rank(queueKey, userId.toString()) != null;

        if (!isAlreadyInQueue) {
            throw new IllegalStateException("대기열에 참여하지 않은 사용자는 SSE 연결을 할 수 없습니다.");
        }

        SseEmitter emitter = new SseEmitter(-1L);
        redisPublisher.registerEmitter(userId, emitter);

        publishQueueStatus(eventId);

        return emitter;
    }

    @Transactional
    public void processQueue(Long eventId) {
        String queueKey = "coupon_queue:" + eventId;
        String couponRemainingKey = "coupon_remaining:" + eventId;
        String issuedUsersKey = "issued_users:" + eventId;

        while (true) {
            Set<Object> firstUserSet = redisTemplate.opsForZSet().range(queueKey, 0, 0);
            if (firstUserSet == null || firstUserSet.isEmpty()) {
                break;
            }
            String firstUser = (String) firstUserSet.iterator().next();

            // 남은 쿠폰 확인
            Integer remainingCoupons = (Integer) redisTemplate.opsForValue().get(couponRemainingKey);
            if (remainingCoupons == null || remainingCoupons <= 0) {
                publishQueueStatus(eventId);
                publishCouponIssueResult(false, Long.valueOf(firstUser));
                break;
            }

            // 쿠폰 발급 처리
            redisTemplate.opsForValue().decrement(couponRemainingKey);
            redisTemplate.opsForSet().add(issuedUsersKey, firstUser);
            redisTemplate.opsForZSet().remove(queueKey, firstUser);

            Boolean result = saveCouponToDatabase(Long.valueOf(firstUser), eventId);

            publishCouponIssueResult(result, Long.valueOf(firstUser));

            publishQueueStatus(eventId);
        }
    }
    
    //...
}
  • joinQueue()
    • 사용자가 대기열에 참여하는 요청을 처리한다.
    • Redis를 이용해 대기열을 관리하고 중복 참여 및 쿠폰 소진 여부를 검증한다.
    • SSE 연결 여부를 확인하고 실시간 상태를 전송한다.
  • publishQueueStatus()
    • Redis에서 대기열 정보를 조회하고 현재 대기 상태를 모든 사용자에게 전송한다.
    • DTO를 만들어 해당 DTO 객체를 SSE로 전송한다.
  • subscribeQueueUpdates()
    • SSE 연결을 생성하여 대기열 상태 업데이트를 실시간으로 수신한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Component
@RequiredArgsConstructor
@Slf4j
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ObjectMapper objectMapper;
    private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
    
    public void registerEmitter(Long userId, SseEmitter emitter) {
        emitters.put(userId, emitter);

        emitter.onCompletion(() -> emitters.remove(userId));
        emitter.onTimeout(() -> emitters.remove(userId));
        emitter.onError(e -> emitters.remove(userId));
    }

    public void sendToEmitter(Long userId, QueueStatusDto queueStatus, String eventName) {
        SseEmitter emitter = emitters.get(userId);

        if (emitter == null) {
            log.warn("SSE Emitter를 찾을 수 없음: userId={}", userId);
            return;
        }

        try {
            emitter.send(SseEmitter.event().name(eventName).data(queueStatus));
            log.info("SSE 데이터 전송 성공: userId={}, eventName={}, data={}", userId, eventName, queueStatus);
        } catch (IOException e) {
            log.error("SSE 데이터 전송 중 오류 발생: userId={}, 오류={}", userId, e.getMessage(), e);
            emitters.remove(userId);
        }
    }

    public void sendToEmitterAndClose(Long userId, Object message, String eventName) {
        SseEmitter emitter = emitters.get(userId);
        if (emitter == null) {
            log.warn("SSE Emitter를 찾을 수 없음: userId={}", userId);
            return;
        }

        try {
            emitter.send(SseEmitter.event().name(eventName).data(message));
        } catch (IOException e) {
            log.error("SSE 전송 중 오류 발생: userId={}, 오류={}", userId, e.getMessage(), e);
        } finally {
            emitter.complete(); // 연결 종료
            emitters.remove(userId);
        }
    }

    public void broadcast(Object message, String eventName) {
        emitters.forEach((userId, emitter) -> {
            try {
                emitter.send(SseEmitter.event().name(eventName).data(message));
            } catch (IOException e) {
                emitters.remove(userId);
                log.error("SSE 브로드캐스트 중 오류 발생: userId={}, 오류={}", userId, e.getMessage(), e);
            }
        });
    }

    public boolean isEmitterRegistered(Long userId) {
        return emitters.containsKey(userId);
    }
}
  • registerEmitter()
    • SSE 연결을 생성하고, 사용자별로 SseEmiiter를 등록한다.
    • 연결을 종료하거나, 타임아웃, 오류 발생 시 ConcurrentHashMap에서 제거한다.

SseEmitter의 역할과 ConcurrentHashMap으로 SseEmitter를 관리하는 이유는 뭘까?

  • SseEmitter
    • SSE 연결을 통해 서버에서 클라이언트로 실시간 데이터를 푸시한다.
    • 클라이언트가 실시간 상태 업데이트를 받을 수 있도록 HTTP 연결을 유지한다.
    • 이러한 동작을 위해 각 사용자별로 고유한 SseEmitter 객체가 필요하다. 사용자별 실시간 데이터를 전송하는 데 사용되므로 연결이 유지되는 동안 객체를 관리해야 한다.
  • ConcurrentHashMap
    • 다중 사용자 요청을 처리하기 위해 스레드 안전한 자료 구조인 ConcurrentHashMap에 SseEmitter 객체를 저장해 관리한다.
    • 사용자별로 고유 키를 사용하여 SseEmitter를 관리하고 연결 종료 시 자동으로 제거한다.

SSE

클라이언트와 서버 간 지속적인 HTTP 연결을 유지하며 서버가 클라이언트에 데이터를 실시간으로 푸시하는 방식이다.

아래와 같이 사용할 수 있다.

1
2
3
4
5
@GetMapping(value = "/queue/updates", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  
public SseEmitter subscribeQueueUpdates(@AuthenticationPrincipal CustomOAuth2User user, @RequestParam Long eventId) {  
	Long userId = user.getUserId();  
    return couponIssueService.subscribeQueueUpdates(userId, eventId);  
}
  • 동작 방식
    • 초기 연결
      • 클라이언트는 HTTP 요청을 통해 SSE 엔드포인트에 연결을 시작한다.
      • Accept 헤더에 text/event-stream 값을 지정해 SSE 연결임을 명시할 수 있다.
    • 서버 응답
      • 서버는 Content-Type: text/event-stream 헤더로 응답을 보낸다.
      • 연결이 열리면 서버는 필요한 데이터를 event 스트림 형태로 클라이언트에 푸시한다.
1
2
event: queue_status 
data: {"aheadCount":10, "estimatedTime":20}

위와 같은 구조로 데이터를 전달하게 된다.

이렇게 통신하는 동안 클라이언트와 서버 간 HTTP 연결이 지속적으로 유지되어야 한다.

서버는 필요할 때만 데이터를 푸시하게 되고, 클라이언트는 이를 즉시 처리해 실시간 대기열 구현에 적합하다고 볼 수 있다.

클라이언트가 브라우저를 닫거나 명시적으로 연결을 종료할 때 SSE 연결이 닫히게 된다.

대기열 서버 분리

위와 같이 구현하고 멘토링을 받았을 때 이러한 대기열 로직은 서버를 분리하지 않으면 대기열을 두는 가장 큰 이유가 퇴색된다고 하셨다.

배포를 하고 있는 상황이기 때문에 EC2 인스턴스를 하나 더 두어 분리해보기로 결정했다.

최종적으로 이번 쿠폰 발급에 대한 아키텍처는 위와 같다.

모든 처리 작업이 Redis를 통해 이루어지기 때문에 Redis를 일종의 매개체로 서버를 쉽게 분리할 수 있었다.

Redis에 저장된 대기열 정보를 이용하는 스케줄러를 메인 서버에만 두면 대기열의 인원을 추출해 해당 유저들에게 쿠폰을 바로 발급해줄 수 있다.

기존에 메인 서버에서만 테스트를 진행했을 때 SSE 연결 수에 대한 부담이 컸다. 추가적으로 모니터링 등에 사용되는 리소스가 있었고 생각보다 대기열의 인원을 많이 둘 수 없던 상황이었다.

대기열만을 위한 서버를 분리하면서 대기열 서버와 메인 서버를 독립적으로 확장할 수 있는 구조가 구축되었다.

만약 대기열의 크기를 늘리고 싶다면 해당 대기열 인스턴스의 용량을 높이거나, 서버를 추가하면 되는 구조가 완성됐다.

또한 SSE 연결을 감당해야 하는 서버또한 분리되어 있기 때문에 같은 방식으로 대규모 SSE 연결에 안정적으로 대응할 수 있는 구조가 구축되었다.

부하테스트 (대기열 크기 정하기)

유레카 과정에서 AWS 비용을 지원해주긴하지만 부하테스트를 배포 환경에 진행했을 때 비용이 얼마 나올지에 대한 확신이 없어 t2.micro 인스턴스로 테스트를 진행했다.

당연히 인스턴스의 사양이 좋지 않기 때문에 대기열 사이즈를 크게 늘릴 수 없을 것이라고 생각하고 진행했다.

우선 5000명이 대기열에 한 번에 몰렸을 때의 테스트를 진행했다.

CPU 사용량은 안정적인 모습을 보였다.

  • 그러나 Java heap space OutOfMemory 에러가 발생한다.
    • Jmeter 에러에는 잡히지 않았지만 실패한 경우이다.

개선

메모리 사용량에서 부하를 감당할 수 없었기 때문에 여러 조치를 취해보았다.

t2.micro 인스턴스는 메모리 1GB를 가지고 있고 최종적으로는 아래 조치들이 이루어졌다.

  • JVM 옵션 설정
    • 초기 힙 메모리 2GB 설정
    • 최대 힙 메모리 2GB 설정
    • export JAVA_OPTS=”-Xms2g -Xmx2g”
  • Swap 메모리 2GB 설정

이렇게 하고 1000명을 테스트해보았다.

CPU 사용량도 매우 안정적인 모습을 보였고, OutOfMemory 에러도 발생하지 않았다.

이후 1500명을 테스트해보았다.

  • OutOfMemory 에러가 발생하지 않고 CPU 사용량도 안정적이었다.
  • 하지만 로그를 확인했을 때 중간중간 응답이 늦는 경우가 발생했다.
    • 메모리 사용량을 초과하면서 Swap 메모리를 사용하는 과정에 발생하는 지연으로 추측했다.

따라서 t2.micro 인스턴스 기준으로는 대기열에 최대 1000명을 받을 수 있다고 판단했고 대기열 사이즈를 1000으로 설정했다.

그리고 스케줄러로 대기열의 인원들을 처리하는 사이즈는 10으로, 선착순 쿠폰 100장을 발급하는 이벤트로 최종 결정하게 되었다.

정리

바로 RDB에 쿠폰을 발급하면서도 어느정도의 트래픽을 대비할 수 있는 구조를 완성할 수 있었다.

SSE를 통해 실시간 대기열 상태를 보여주며 사용자 경험에도 신경쓰는 구조를 구축했다.

대기열 서버를 분리하고, 실제 배포 환경에서의 테스트도 진행해 더 현실적인 가정을 할 수 있었던 것 같다.

시간상의 문제로 SSE 연결을 최대로 할 수 있는 개선을 많이 시도해보지 못해 아쉽다.

가장 단순한 방법은 대기열 서버를 분리해놓았기 때문에 인스턴스를 업그레이드하거나 서버를 더 만들어 분산해주면 되긴하지만 다른 방법들을 더 시도해볼 수 있었다면 좋았을 것 같다.

추후 더 생각해볼 예정이다.

This post is licensed under CC BY 4.0 by the author.

로그 추적기, Flyway, RestAssured, Jacoco 도입

댕댕살롱 최종 배포 구조도 (+ 아쉬운 점)