04.04 TIL (SSE)

2023. 4. 7. 22:58개발일지

SseEmitter은 어떻게 작동하는지 이해하기 힘들었기에 정리해놔야 겠다는 생각이 들었...

public SseEmitter subscribe(Long memberId, String lastEventId) {
    // emitter 에 고유값을 주기위해
    String emitterId = makeTimeIncludeUd(memberId);

    Long timeout = 10L * 1000L * 60L; // 10분
    // 생성된 emitterId를 기반으로 emitter 를 저장
    SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(timeout));

    // 전달이 완료되거나 emitter의 시간이 만료된 후 레포에서 삭제
    emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
    emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));


    // 에러를 방지하기 위해 더미 데이터를 전달
    String eventId = makeTimeIncludeUd(memberId);

    // 이벤트를 구분하기 위해 이벤트 ID 를 시간을 통해 구분해줌
    sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");

    if (hasLostData(lastEventId)) {
        sendLostData(lastEventId, memberId, emitterId, emitter);
    }

    return emitter;
}

SseEmitter는 Spring Framework에서 제공하는 SSE를 구현한 클래스이다.

클라이언트가 SseEmitter 객체를 생성하고 이를 서버에 등록하면, 서버는 이벤트 스트림을 생성하고 SseEmitter 객체를 통해 지속적으로 클라이언트에게 이벤트를 전송하기위해 사용한다.

 

timeout은 10분으로 설정해놨는데 추후에 알아보고 어떻게 설정하는것이 좋은지 적용시켜볼 생각이다.

 

 

아래는 sendNotification매서드를 이용해 클라이언트에게 보내주는 부분이다

private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
    try {
        emitter.send(SseEmitter.event()
                .id(eventId)
                .name("message")
                .data(data));
    } catch (IOException exception) {
        emitterRepository.deleteById(emitterId);
    }
}

연결할때는 data값을 String 인 "EventStream Created. [userId=" + memberId + "]" 을 사용했다.
이후 메세지를 보낼때는 data값에 dto를 이용해 보내준다.

 

프로젝트상 SSE가 연결이 되고나면 리뷰를 작성시에 작성자에게 알림을 보내주도록 구현해놨다. 

리뷰를 작성하는 메서드에 

notificationService.send(Member receiver, String content, String url) 을 추가해 줬다.

 

NotificationService 에 있는 send 메서드이다.

@Async
public void send(Member receiver, String content, String url){
    Notification notification = notificationRepository.save(createNotification(receiver, content, url));

    Long receiverId = receiver.getId();
    String eventId = makeTimeIncludeUd(receiverId);
    Map<String, SseEmitter> emitterMap = emitterRepository.findAllEmitterStartWithByMemberId(String.valueOf(receiverId));
    emitterMap.forEach(
            (key, emitter) -> {
                emitterRepository.saveEventCache(key, emitter);
                sendNotification(emitter, eventId, key, NotificationDto.create(notification));
            }
    );
}

비동기식으로 처리하기위해 @Async 어노테이션을 사용했다.

 

조회하고 삭제하는 메서드도 구현했지만 기존의 CRUD와 동일해서 skip...

 

아래는 유실된 데이터가있는지 확인하고 있다면 알림을 전송해주는 메서드 이다.

if (hasLostData(lastEventId)) {
    sendLostData(lastEventId, memberId, emitterId, emitter);
}
// lastEventId 이후에 발생한 알림을 전송
private void sendLostData(String lastEventId, Long memberId, String emitterId, SseEmitter emitter) {
    Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
    eventCaches.entrySet().stream()
            .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
            .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}

// lastEventId가 비어있는지 확인
private boolean hasLostData(String lastEventId) {
    return !lastEventId.isEmpty();
}

오늘은 SSE 구현중 service에 있는 메서드를 알아봤다.


사실 SSE같은 기술을 처음 쓰면 메서드 보단 환경설정 이나 개념에 대한 이해가 더 어렵다

어떻게 프론트로 넘겨주는거지? 라는 생각이 SSE를 이해하는데 어려웠던 부분이 었는데

간단했다. SseEmitter 클래스가 알아서 해주는거였다... 프론트도 저거 쓰면 통신이된다. 신기...

 

 

SSE는 Socket과 비교가 많이되는데 왜 알림에서는 SSE를 많이쓰는지 알아볼 필요가있다.

- Socket은 양방향이니 단방향일 경우엔 SSE를 쓰는게 맞다

- 채팅을 SSE를 이용하기도 하지만 일반적이지 않다.

- Socket은 이진, 문자열 데이터를 지원하고 SSE는 텍스트 데이터만 지원하기 때문에 상황에 맞게 사용해야 한다.

'개발일지' 카테고리의 다른 글

04.07 (Refactoring)  (0) 2023.04.07
04.06 TIL (Nginx)  (0) 2023.04.07
04.03 TIL  (0) 2023.04.03
03.27 Redis 사용하는 이유  (0) 2023.03.27
03.25 TIL  (0) 2023.03.25