Home Event 처리 방식으로 친밀도 랭킹 효율적으로 계산하기 (+Event 정리)
Post
Cancel

Event 처리 방식으로 친밀도 랭킹 효율적으로 계산하기 (+Event 정리)

이전 포스트에 이어서 캐시 기능 구현 이후에 고민했던 점과 그 부분을 어떻게 해결했는지 정리해본다.

캐싱 구현 이후 고려했던 부분

기존에 캐싱을 통한 조회를 구현하여 친밀도 랭킹 확인을 조금 더 효율적으로 할 수 있었다.

그런데 현재 상태로는 만족할 수 없었다. 기능이 불완전했다.

왜냐하면, 만약 새로운 유저의 프로필이 등록되었을 경우나 기존 유저의 프로필이 변경되었을 경우 재계산하는 로직이 구현되지 않았기 때문이다.

다른 프로필에 변동이 있을 경우 당연하게도 캐시를 이용하면 안되고, 그대로 조회해서도 안된다. 더해 DB에 있는 기존에 계산된 데이터도 update 되어야 한다.

우선 프로필 변동 사항을 어떻게 파악해야 할까라는 고민이 있었다.

프로필의 변동을 DB 값을 직접 조회해 하나하나 비교하게 되면 매우 비효율적이다. 따라서 이 방법은 채택할 수 없었다.

이벤트를 이용한 처리 방식에 대한 고민

방법을 찾다보니 기존에 사용해보지 않았던 이벤트 발행 및 처리를 통한 방법이 있었다.

그러나 단순히 프로필 등록/수정 요청을 승인했을 때 이벤트를 발행하고 이벤트 리스너를 통해 바로 모든 프로필에 대해 재계산 로직을 실행시켜 값을 저장하면 (유저의 수) * (유저의 수 - 1) 만큼의 계산이 되고 insert/update 쿼리가 발생하기 때문에 매 이벤트 발행 시 바로 처리하는 방식은 채택할 수 없었다.

물론 이 방식을 채택했을 때 미리 계산을 해둔다는 점은 좋을 수 있으나, 친밀도 랭킹 메뉴에 접근하지 않는 유저까지 미리 계산해놓기 때문에 결국 사용하지 않게 되는 데이터까지 미리 계산하게 되는 비효율이 발생한다는 생각이 들었다.

그래서 더 효율적인 방법을 계속해서 고민하게 되었다.

Spring Event

최종적으로 채택한 방법을 설명하기 전, Spring의 Event에 대해 정리한다.

Spring에서의 이벤트는 이벤트 발행자, 이벤트, 이벤트 리스너로 구성된다.

  • 이벤트 발행자(Publisher): 이벤트를 발생시키는 객체로 ApplicationEventPublisher 인터페이스를 통해 이벤트를 발행할 수 있다.
  • 이벤트 클래스: 이벤트는 일반적으로 사용자 정의 클래스로 표현되고 ApplicationEvent를 확장하거나, POJO로 만들 수 있다.
    • Spring 4.2부터 POJO로 만들 수 있게 되었다.
    • ApplicationEvent는 생성자에서 Object source를 받아 부모 클래스에 전달한다. source는 이벤트가 발생한 객체나 컨텍스트를 나타낸다.
    • ApplicationEvent를 확장했을 때 이벤트의 소스 추적, 특정 타입의 이벤트 필터링 등등의 장점이 있으나 Spring에 의존적이게 된다는 단점이 존재한다.
  • 이벤트 리스너: 이벤트를 처리하는 객체로 @EventListener 애노테이션을 사용해 특정 이벤트가 발생했을 때 실행되는 메서드를 정의할 수 있다.

이제 위의 이벤트 구성 요소들을 활용해 어떻게 이벤트를 이용할 수 있는지 알아보자.

비동기 이벤트 처리

  • Spring의 이벤트 처리 방식은 우선 동기적이다. 이벤트가 발행되면 리스너가 이벤트를 처리할 때까지 발행자가 대기한다.
  • 하지만 @Async 애노테이션을 사용해 비동기적으로 이벤트를 처리할 수도 있다.
  • 비동기 처리를 위해 @EnableAsync 애노테이션을 main 메서드에 붙여 비동기 처리를 활성화 해주어야 한다.
1
2
3
4
5
6
7
8
9
@Component
public class UserEventListener {

    @EventListener
    @Async
    public void handleUserCreatedEvent(UserCreatedEvent event) {
        // 비동기적으로 처리할 로직
    }
}
1
2
3
4
5
6
7
@SpringBootApplication
@EnableAsync
public class MyApp {
    public static void main(String[] args) {
        SpringApplication.run(MyApp.class, args);
    }
}

다중 이벤트 처리

  • 하나의 이벤트를 여러 리스너에서 처리할 수 있다.
  • 여러 리스너가 각각 다른 작업을 처리하도록 할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class LoggingEventListener {

    @EventListener
    public void logUserCreatedEvent(UserCreatedEvent event) {
        System.out.println("로그: 사용자 생성됨 - " + event.getUser().getName());
    }
}

@Component
public class NotificationEventListener {

    @EventListener
    public void notifyUserCreatedEvent(UserCreatedEvent event) {
        System.out.println("알림: 새로운 사용자 - " + event.getUser().getName());
    }
}

트랜잭션 기반 이벤트 처리

  • 이벤트는 트랜잭션과 연결지어 처리할 수 있다.
  • @TransactionalEventListener를 사용하면 트랜잭션 커밋 후 이벤트가 처리되도록 할 수 있다.
1
2
3
4
5
6
7
8
@Component
public class TransactionalUserEventListener {

    @TransactionalEventListener
    public void handleUserCreatedAfterCommit(UserCreatedEvent event) {
        System.out.println("트랜잭션 커밋 후 사용자 생성 이벤트 처리");
    }
}

정리

이벤트를 이용하면 어떤 애플리케이션의 컴포넌트 간 상호작용을 비동기 또는 동기적으로 처리할 수 있다.

이벤트 발행자와 리스너 간의 명확한 분리를 통해 유지보수, 확장이 용이해진다.

이벤트를 정의하고, 리스너를 통해 처리하는 부분들은 구현한 코드들을 보면서 다시 확인해보자.

이벤트를 이용한 처리 방식 구현

여러 방법을 찾다가 최종적으로 이 방법을 채택하게 되었다.

우선 방법을 설명하면 프로필 등록/수정 요청을 승인했을 때 이벤트가 발생하는 부분은 동일하다.

다만 그 이벤트를 처리하는 이벤트 리스너에서 어떤 재계산 로직이 바로 실행된다거나 하지 않는다. 해당 이벤트를 바로 처리하는 방식은 이벤트 리스너에서는 발생한 이벤트에 대해 Redis에 저장하여 해당 데이터를 이용해 이벤트를 처리하는 로직을 작성하는 것이다.

다시 흐름을 정리해보겠다.

  • 관리자가 프로필 등록/요청 승인 시 이벤트를 발행한다.
    • 프로필이 새로 등록되거나 기존 프로필이 업데이트 되는 경우는 이 경우 뿐이다.
  • 이벤트 리스너를 통해 해당 이벤트를 처리한다.
    • 해당 이벤트가 발생되면 Redis에 이벤트를 저장한다.
    • 처리 여부를 함께 저장한다.
  • Redis에 저장할 때 UserId, ProfileId를 이용한다.
    • UserId가 Login된 UserId이고, 특정 ProfileId에 대한 처리여부가 false라면 그 때 재계산 로직을 수행하고, 처리 여부를 true로 바꾸어준다.
    • 단, Redis에 이벤트를 하나만 등록하면 유저별로 재계산하는 로직을 구현할 수 없다.
    • 따라서 이벤트 하나가 발생되면, 전체 UserId에 변경된 ProfileId를 매핑하여 저장한다.
    • 즉, 이벤트 하나 당 유저 수만큼의 이벤트 처리 기록이 Redis에 저장되는 것이다.

이제 구현된 코드를 직접 보면서 Event에 대해 이해해본다.

이벤트 객체

1
2
3
4
5
6
7
8
9
@Getter  
@Setter  
@AllArgsConstructor  
public class ProfileChangeEvent {  
  
	private Long profileId;  
	private Long userId;  
	private String eventType; //프로필 등록인지 수정인지.
}

단순 POJO로 구현했다.

프로필을 등록/수정 요청한 유저의 아이디와 해당 프로필 아이디 정보가 담긴다.

이벤트 리스너

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component  
@RequiredArgsConstructor  
public class ProfileChangeListener {  
  
	private final RedisTemplate<String, Object> redisTemplate;  
	private final UserRepository userRepository;  
	  
	@EventListener  
	public void handleProfileChangeEvent(ProfileChangeEvent event) {  
		Long profileId = event.getProfileId();  
		  
		List<User> allUsers = userRepository.findAll();  
		  
		for (User user : allUsers) {  
			String key = "profileEvent";  
			String field = "userId:" + user.getId() + ":profileId:" + profileId;  
			redisTemplate.opsForHash().put(key, field, "false");  
		}  
	}  
}

위에서 구현한 ProfileChangeEvent가 발행되면, Redis에 각 유저에 대해 해당 프로필 변경 이벤트를 처리했는지에 대한 여부들을 저장하게 된다.

Redis의 해시 자료구조를 활용해 저장했다.

Redis에서 사용할 해시 키를 정의하고, “profileEvent”라는 키를 가진 해시 구조에 여러 field-value 쌍을 저장하게 된다.

즉, “userId:1:profileId:1”이 field로 그와 매칭된 value에는 “false” 값을 저장하게 된다. 아래 실제로 저장된 값을 보고 이해해보자.

7) "profileEvent"

위와 같이 저장이 되어있고, hgetall profileEvent 명령어를 통해 들여다보면 아래와 같이 데이터가 저장된다.

 1) "userId:1:profileId:1"
 2) "true"
 3) "userId:2:profileId:1"
 4) "false"
 5) "userId:3:profileId:1"
 6) "false"
 7) "userId:4:profileId:1"
 8) "true"
 9) "userId:5:profileId:1"
10) "false"
11) "userId:6:profileId:1"
12) "false"
13) "userId:7:profileId:1"
14) "false"

//...

profileId 1번이 변경되었을 때 해당 변경에 대하여 이벤트가 발생되고, 존재하는 유저 전체에게 해당 프로필 변경 이벤트 처리 여부에 대한 내용이 저장된 모습이다.

“true” 이면 해당 이벤트를 처리해 재계산을 마친 경우이고, “false” 일 경우 재계산이 필요한 경우이다.

참고로 ture, false가 String으로 저장되어 있는데, 처음에 Boolean으로 구현했을 때 Redis에 기본적으로 설정된 Boolean 값을 String으로 저장하려는 과정에서 에러가 나타나 명시적으로 문자열로 변환해주었다.

이벤트 발행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//ProfileService의 메서드

private final ApplicationEventPublisher eventPublisher;

public void updateProfileRequest(Long requestId, String requestStatus) {  
 
	ProfileRequest request = profileRequestRepository.findById(requestId)  
			.orElseThrow(() -> new IllegalArgumentException("Invalid requestId: " + requestId));  
	    
	RequestStatus status;  
	try {  
		status = RequestStatus.valueOf(requestStatus);  
	} catch (IllegalArgumentException e) {  
		throw new IllegalArgumentException("Invalid requestStatus: " + requestStatus);  
	}  
	  
	  
	request.changeRequestStatus(status); 
	 
	profileRequestRepository.save(request);  
	  
	if (status == RequestStatus.APPROVED) {  
		Profile profile = request.getProfile();  
		eventPublisher.publishEvent(new ProfileChangeEvent(profile.getId(), request.getUser().getId(),  
		request.getRequestType().name()));  
	}  
}

ApplicationEventPublisher 인터페이스를 사용해 우리가 구현한 ProfileChangeEvent를 발행한다.

위 코드에서는 프로필 등록/수정 요청을 승인했을 경우에 이벤트가 발행되어야 하기 때문에 그 부분에 이벤트 발행 로직을 넣었다.

이벤트를 발행할 때 우리가 구현한 이벤트 객체에 맞게 알맞은 데이터를 삽입해주면 된다.

이벤트 처리 여부의 활용

이제 Redis에 저장된 이벤트 처리 여부를 어떻게 활용하는지 확인해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service  
@RequiredArgsConstructor  
@Slf4j  
public class FamiliarityEventService {  
  
	private final RedisTemplate<String, Object> redisTemplate;  
	  
	public List<Long> getUnprocessedProfileIds(Long userId) {  
		Set<Long> unprocessedProfileIds = new HashSet<>();   
		  
		Map<Object, Object> existProfileEvents = redisTemplate.opsForHash().entries("profileEvent");  
		  
		for (Map.Entry<Object, Object> entry : existProfileEvents.entrySet()) {  
			String key = (String) entry.getKey();  
		  
			if (key.startsWith("userId:" + userId + ":") && "false".equals(entry.getValue())) {  
				Long unprocessedProfileId = Long.parseLong(key.split(":profileId:")[1]);  
				unprocessedProfileIds.add(unprocessedProfileId);  
			}  
		}  
	  
		return new ArrayList<>(unprocessedProfileIds);  
	}  
	  
	public void updateClearEvent(Long userId, Long profileId) {  
		String key = "profileEvent";  
		String field = "userId:" + userId + ":profileId:" + profileId;  
		redisTemplate.opsForHash().put(key, field, "true");  
	}  
	  
	public boolean isEventProcessed(Long loginUserId, Long targetProfileId) {  
		String key = "profileEvent";  
		String field = "userId:" + loginUserId + ":profileId:" + targetProfileId;  
		  
		Object processed = redisTemplate.opsForHash().get(key, field);  
		return processed != null && processed.equals("true");  
	}  
}

이 서비스 클래스에서는 Redis에 저장된 이벤트 처리 여부에 대한 데이터를 처리하는 메서드들이 담겨있다.

로그인 된 유저의 이벤트 처리 기록을 가져오기 위한 메서드, 이벤트를 처리했다고 true로 바꾸어 주기 위한 메서드, 이벤트 처리 여부를 확인하는 메서드가 정의되어 있다.

이 부분들이 이벤트 처리 여부의 기록을 남기고 그 내용을 바탕으로 처리하는 방법의 핵심이다.

이 메서드들을 어떻게 사용했는지 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//FamiliarityService 메서드

public Page<FamiliarityRankingDto> calculateFamiliarityRanking(Long loginUserId, Pageable pageable) {  
  
	User loginUser = userRepository.findByIdWithProfileAndHobbies(loginUserId)  
			.orElseThrow(() -> new IllegalArgumentException());  
	  
	List<Long> unprocessedProfileIds = familiarityEventService.getUnprocessedProfileIds(loginUserId);  
	  
	Long loginUserProfileId = loginUser.getProfile().getId();  
	
	if (unprocessedProfileIds.contains(loginUserProfileId) &&
		 !familiarityEventService.isEventProcessed(loginUserId, loginUserProfileId)) 
	{  
		familiarityRankService.invalidateCachesForLoginUser(loginUserId);  
		recalculateForAllUsers(loginUserId);  

		familiarityEventService.updateClearEvent(loginUserId, loginUserProfileId);  
	}  
	  
	if (!unprocessedProfileIds.isEmpty()) {  
		for (Long profileId : unprocessedProfileIds) {  
			Profile targetProfile = profileRepository.findById(profileId)  
			.orElseThrow(() -> new IllegalArgumentException());  
			  
			familiarityRankService.recalculateFamiliarityTargetUser(loginUser, targetProfile);  
			familiarityRankService.invalidateCachesTargetUser(loginUserId, targetProfile.getId());  
			familiarityEventService.updateClearEvent(loginUserId, targetProfile.getId());  
		}  
	}  
	  
	// 기존 승인된 프로필들과 계산하여 랭킹을 조회하는 로직
	  
	return new PageImpl<>(pagedRankingList, pageable, total);  
}

두 개의 로직이 존재한다.

  • 이벤트 처리 기록의 ProfileId에 로그인 된 유저의 프로필 아이디가 존재하는 경우
    • 이 경우의 의미는 본인의 프로필이 update 되었다는 뜻이다.
    • 따라서 모든 다른 프로필과의 재계산 과정이 필요하다.
    • 그래서 로그인 된 유저와 관련된 모든 캐시를 무효화하고 재계산 후 DB의 점수 값들을 update 한다.
    • 이 후 Redis에 저장되어있는 이벤트 처리 기록의 처리 여부를 true로 바꾸어준다.
  • 이벤트 처리 기록의 ProfileId가 존재하는 경우
    • 이 경우는 어떤 유저의 Profile이 새로 생성되거나 업데이트 되었을 때 이벤트가 발생했고, 그 이벤트를 처리하지 않았다는 의미이다.
    • 따라서 그 유저에 대해 재계산이 필요하다는 의미이고, 해당 프로필과의 캐시만 무효화한 후 재계산을 해주었다.

결과

우선 DB에서 조회하는 방식과 이벤트 처리 방식의 성능 차이는 직접 확인하지는 못했다. DB에서 조회해서 프로필의 변동을 체크하는 방식은 단순하게 생각해도 너무 비효율적이라 구현해보지 않았기 때문이다.

결과적으로는 이벤트를 발행하고, 그 이벤트를 처리하는 방식을 Id값을 통해 Redis에 각 유저별 해당 이벤트 처리 여부를 하나하나 저장해서 재계산 하지 않았을 때 이벤트를 유지하는 방식으로 구현하여 프로필의 변동이 있는 User를 대상으로만 재계산하는 로직을 구현할 수 있었다.

직접적인 성능 비교는 할 수 없지만, 이벤트 방식을 통해 프로필이 업데이트 된 유저만 캐시를 무효화하고 재계산을 하고 나머지는 기존의 캐시를 이용할 수 있도록 하여 전체 유저 정보를 캐시로 조회했을 때와 큰 차이가 없도록 할 수 있었다.

남은 문제

사실 이 구현 방법에도 문제가 있다. Redis에 이벤트 처리에 대한 부분을 관리하고 있는데, 이 데이터를 유지하는 시간이 지나 삭제가 되면 그 사이에 재계산 로직을 수행하지 않은 유저는 다시 계산을 하지 않고 기존 데이터를 이용하게 된다.

물론 데이터를 유지하는 시간을 매우 길게 하는 방법으로 해결할 수 있을 것이다. 하지만 이 또한 정말 많은 유저가 이용한다고 생각했을 때 비효율적인 방법이라는 생각이 들었다.

이벤트 처리 기록이 정말 많은 수가 쌓였을 때 성능에 얼마나 문제가 있는 지에 대한 부분을 파악하는 것이 중요할 것 같다.

이 부분은 테스트가 필요할 것 같고, 만약 성능에 문제가 있을 경우 해결할 수 있는 방법은 아래 정도가 있을 것 같다.

  • Redis의 maxmemory-policy를 설정해 오래된 데이터를 삭제하는 방식
    • 물론 이 방법도 궁극적인 해결 방법은 아닌 것 같다. 결국에는 데이터가 삭제되었을 경우 삭제 전 접근하지 못했을 때 똑같이 계산을 제대로 하지 못할 것이라고 생각된다.
  • kafka 사용
    • 카프카는 이벤트를 영구적으로 저장하고, 여러 유저가 동일한 이벤트를 소비할 수 있도록 지원한다.
    • 데이터가 메모리가 아닌 디스크에 기록되어 지속성을 보장한다. 물론 이 부분에 의해 처리 속도는 비교적 더 느릴 수 있다.
    • 카프카는 이벤트나 메시지를 로그 형태로 저장하고 offset을 기준으로 데이터가 유지된다.
      • 이벤트가 발행한 순서대로 처리되며, 주기적으로 오래된 데이터를 삭제하도록 설정할 수 있다.

결과적으로 현재로서는 프로젝트 자체가 크지 않고, 사용자도 많지 않기 때문에 Redis를 사용해도 문제가 없다.

하지만 현재 궁극적으로는 문제가 있는 것이 맞고 그 대안이 존재함을 알았기 때문에 추후 수정해봐야겠다는 생각이 들었다.

카프카를 사용해본 적이 없고, 아직 잘 알지 못하는 만큼 카프카에 대해 공부해보고 해당 프로젝트에 적용해 연습을 해 볼 생각이다.

This post is licensed under CC BY 4.0 by the author.

캐시 방식으로 친밀도 랭킹 효율적으로 확인하기 (+Redis 정리)

트랜잭션 분리를 통한 개선