Spring SSE로 실시간 알림 구현하기
SSE는 Server Sent Events 라는 뜻으로 말그대로 서버에서 이벤트를 보내준다 라는 의미이다.
저번시간에 정ㅇㅇ군의 발표에서 봤듯이
통신에는 여러 방식이 있다.
여기서 SSE는 스트리밍과 가장 유사하다고 할 수 있다.
여기서 내가 유사하다고 한 것은 사람마다 SSE가 스트리밍의 한 종류이다 라고 말하는 사람도 있고
SSE는 스트리밍의 진화 형태이다. 라고 말하는 사람도 있다.
SSE is in fact a form of HTTP streaming. It is just an HTTP response with MIME type of "text/event-stream" and it sends plain text messages terminated with double newlines. (SSE는 사실 HTTP 스트리밍의 한 형태입니다. 단순히 "text/event-stream" MIME 타입을 가진 HTTP 응답이며, 이 응답은 두 줄 바꿈으로 끝나는 일반 텍스트 메시지를 전송합니다.)
IMHO, HTTP2 Server sent events has rich features than HTTP Streaming. (제 생각에는, HTTP2의 Server-Sent Events(SSE)가 HTTP 스트리밍보다 더 풍부한 기능을 가지고 있다고 봅니다.)
결국 거의 다 뇌피셜인 것 같다.
SSE?
그래서 SSE가 뭐냐?
SSE는 스트리밍의 형태와 비슷하게 서버가 클라이언트에게 데이터를 실시간으로 전송해주는 기술이다.
SSE는 단방향 통신으로 클라이언트의 요청이 딱히 없더라도 서버에서 새로운 데이터를 계속해서 보내줄 수 있는 것이다.
장단점
SSE에도 당연히 장단점이 존재한다.
장점
- •
HTTP를 사용하기 때문에 구현이 쉽다.
- •
연결이 끊어진 경우 자동으로 재연결을 시도한다.
- •
서버에서 실시간으로 데이터를 전송해줄 수 있다.
단점
- •
GET 메서드만 지원한다.
- •
클라이언트가 연결을 끊어도 서버에서는 알 수 있는 방법이 없다.
- •
SSE를 사용하기 위해서는 연결을 유지해야하기 때문에 클라이언트가 많아질수록 서버의 부담이 커진다.
왜 굳이 SSE를 쓸까?
우리는 그럼 다른 HTTP 통신, 소켓을 버리고 왜 SSE를 쓸까?
Polling
위의 사진과 같이 Polling은 요청이 와야지 응답을 보내준다.
따라서 클라이언트에서 요청을 보내주지 않는다면 새로운 데이터를 줄 수 없는 것이다.
그래서 실시간이 중요한 알림에서는 딱히 좋은 선택이 아닌 것이다.
또한, 계속된 요청, 응답으로 트래픽이 낭비될 수도 있다.
Long Polling
Long Polling은 그나마 Polling보다는 나아졌다고 할 수 있다.
Polling에 비해 거의 실시간 느낌이 나게 알림을 전송할 수 있다.
하지만 이것도 계속된 양방향 통신으로 서버에 부담이 될 수 있다.
Web Socket
웹소켓은 다들 대충은 알거라고 생각하고 사진을 첨부하지 않았다. (절대로 ㅈㅇㅇ이 안만들어놔서 안올리는거 아님)
웹소켓은 위의 다른 선택지보다는 더 좋은 선택지가 될 수 있다.
웹소켓은 하나의 연결을 연결이 끝날 때까지 유지하고, 다른 방식에 비해 더 적은 자원을 소모해서 서버의 부담을 줄여줄 수 있다.
하지만 구현이 좀 복잡할 수 있고, 부담이 적어지는 것이지 부담이 없다고 할 수 없다.
또한, 알림의 경우 클라이언트에서는 딱히 데이터를 보내줄 필요가 없기 때문에 굳이 양방향을 유지하면서 웹소켓을 쓸 이유가 없다는 생각이다.
정리
SSE는 위의 다른 기술들과 달리 단방향 통신으로 서버에서 클라이언트로 실시간으로 데이터를 보내준다.
따라서 SSE는 아래와 같은 기능들에 유용하게 사용될 수 있다.
- •
SNS 실시간 피드
- •
알림
- •
실시간 뉴스
- •
실시간 경기 점수 상황
- •
프로그램 설치 로딩 바
- •
등등
동작 방식
SSE는 아주 심플하게 동작한다.
위 그림과 비슷하게 먼저 클라이언트에서 서버에 요청을 보낸다.
클라이언트 : 야, 지금부터 데이터 보내줘
그럼 서버에서 알겠다는 응답을 보내준다
서버 : ㅇㅋ
그럼 이제부터 서버는 특정 이벤트가 발생할 때마다 클라이언트에게 데이터를 전송하게 된다.
클라이언트는 서버에서 보낸 데이터에 대해 응답을 하지는 않고, 해당 데이터에 따른 작업을 수행한다.
만약 이 연결이 끊어지게 된다면 클라이언트는 서버에게 재연결을 요청한다.
이는 HTTP를 통해 이루어지고, 웹소켓에 비해 덜 복잡하다.
사용법
그럼 이 SSE를 어떻게 사용을 할까?
클라이언트는 서버로 먼저 요청을 보낸다고 했다.
요청을 보낼 때, 헤더에 ‘Accept : text/event-stream’, ‘Connection : keep-alive’
이라는 헤더를 달고 요청을 보낸다.
그러면 서버에서는 ‘Content-Type : text/event-stream’
이라는 헤더로 응답을 보내준다.
이렇게 한번 연결이 이루어지면 서버에서 데이터를 보내더라도 HTTP헤더가 실릴 필요가 없다.
구현
그럼 지금부터 간단한 구현 방법을 알아보자.
Controller
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(@AuthenticationPrincipal SecurityUserDetails userDetails,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return notificationService.subscribe(userDetails.getId(), lastEventId);
}
여기서 produces = MediaType.TEXT_EVENT_STREAM_VALUE
로 인해 응답의 헤더에
‘Content-Type : text/event-stream’
이 붙게 된다.
Service
public SseEmitter subscribe(Long userId, String lastEventId) {
String emitterId = userId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
sendClient(emitter, emitterId, "EventStream created. Id : "+emitterId);
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByUserId(String.valueOf(userId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> {
sendClient(emitter, entry.getKey() , entry.getValue());
});
}
return emitter;
}
여기서 emitterId를 만들 때 userId + "_" + System.currentTimeMillis();
이렇게 userId와 현재 시간을 합쳐서 만든다.
이는 사용자가 여러 브라우저에서 접속할 수 있기 때문에 각 브라우저에 대해 연결 하기 위해서이다.
만약 현재 시간을 넣지 않는다면 사용자가 새로운 브라우저에서 연결할 때마다 그 전에 열었던 브라우저에서는 연결이 끊어지게 될 것이다.
sendClient(emitter, emitterId, "EventStream created. Id : "+emitterId);
연결을 하고 클라이언트에게 연결했다는 알림을 보내주는 코드이다.
하지만 해당 기능은 원래 굳이 필요없는 기능이다.
그럼 왜 이 코드가 들어가 있을까?
만약 서버와 클라이언트가 연결을 하고 timeout으로 종료될 때까지 아무런 데이터가 오가지 않는다면 서버에서는 503 (Service Unavailable)가 터지게 된다.
이를 예방하기 위해 무조건 1번의 데이터가 전송되도록 하는 것이다.
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByUserId(String.valueOf(userId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> {
sendClient(emitter, entry.getKey() , entry.getValue());
});
}
위 코드는 만약 클라이언트가 연결을 할 때, lastEventId를 같이 보내준다면
해당 이벤트 이후의 이벤트는 받지 못했기 때문에 해당 데이터들을 새로 보내주는 것이다.
public void send(User user, NotificationType notificationType, String content, String url) {
Notification notification = notificationRepository.save(createNotification(user, notificationType, content, url));
String userId = String.valueOf(user.getId());
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByUserId(userId);
sseEmitters.forEach(
(key, emitter) -> {
String eventId = userId+"_"+System.currentTimeMillis();
emitterRepository.saveEventCache(eventId, NotificationRes.fromEntity(notification));
sendClient(emitter, eventId, NotificationRes.fromEntity(notification));
}
);
}
private void sendClient(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.data(data));
} catch (IOException e) {
emitterRepository.deleteById(emitterId);
}
}
해당 코드는 서버에서 클라이언트로 데이터를 전송하는 코드이다.
emitterRepository에는 두가지 저장소가 존재한다.
emitters는 현재 연결 중인 emitter에 대한 정보,
eventCache에는 알림에 대한 정보가 저장되어 있다.
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByUserId(userId);
sseEmitters.forEach(
(key, emitter) -> {
String eventId = userId+"_"+System.currentTimeMillis();
emitterRepository.saveEventCache(eventId, NotificationRes.fromEntity(notification));
sendClient(emitter, eventId, NotificationRes.fromEntity(notification));
}
);
여기서 emitterRepository.findAllEmitterStartWithByUserId(userId);
를 통해 현재 사용자와 연결된 모든 emitter를 가져온다.
다음으로, 가져온 모든 emitter에 대해 알림을 전송해주는 과정이다.
sseEmitters.forEach(
(key, emitter) -> {
String eventId = userId+"_"+System.currentTimeMillis();
emitterRepository.saveEventCache(eventId, NotificationRes.fromEntity(notification));
sendClient(emitter, eventId, NotificationRes.fromEntity(notification));
}
);
근데 지금 글을 작성하면서 코드를 다시 보니 아래 과정은 굳이 forEach 문 안에서 여러번 반복할 필요가 없는 것 같다.
String eventId = userId+"_"+System.currentTimeMillis();
emitterRepository.saveEventCache(eventId, NotificationRes.fromEntity(notification));
해당 로직은 사용자에게 갈 알림을 eventCache에 저장하는 과정인데,
원래 대부분의 사람들의 코드를 보면 아래와 같이 작성되어 있다.
sseEmitters.forEach(
(emitterId, emitter) -> {
emitterRepository.saveEventCache(emitterId, NotificationRes.fromEntity(notification));
sendClient(emitter, emitterId, NotificationRes.fromEntity(notification));
}
);
즉, 각 이벤트를 저장할 때, key 값을 emitterId로 하는 것이다.
그럼 문제점이 생기는데, 사용자는 lastEventId를 이용해서 그 이후의 event를 모두 수신할 수 있어야한다.
근데 위의 코드는 emitterId라는 같은 key값에 대해 새로운 이벤트를 계속 덮어쓰기 때문에
제일 최신의 event만 조회할 수 있는 것이다.
그래서 내가 수정한 코드가 위의 코드인데,
String eventId = userId + "_" + System.currentTimeMillis();
NotificationRes notificationRes = NotificationRes.fromEntity(notification);
emitterRepository.saveEventCache(eventId, notificationRes);
sseEmitters.forEach(
(key, emitter) -> {
sendClient(emitter, eventId, notificationRes);
}
);
이렇게 수정해주면 될 것 같다.
그러면 클라이언트에서도 알림을 실시간으로 전송해줄 수 있고, lastEventId가 오는 경우 해당 이벤트 이후의 모든 이벤트를 다시 받을 수 있게된다.
내가 구현하면서 한 뻘짓
LastEventID
나는 클라이언트에서 서버로 연결을 요청할 때 LastEventID를 넣어주면 그 이후의 이벤트를 모두 서버에서 보내주도록 로직을 분명히 짜두었는데 포스트맨으로 테스트를 하니 계속 제일 최신의 데이터만 오는 것이었다.
이걸 해결하기 위해 한 3일 정도는 이것만 붙잡고 있었던 것 같다.
근데 결국 해결을 못했고, 혹시 포스트맨 문제인가? 싶어서 직접 크롬에서 해당 api로 연결을 시도해봤다.
그랬더니 이게 뭐람?
바로 밀렸던 알림들이 사사삭 오는 것이 아닌가?
그냥 포스트맨의 문제였다.
이 사건을 계기로 포스트맨을 너무 믿으면 안되겠다라는 생각이 굳게 자리잡았다.
재연결
내가 읽었던 모든 글에서 SSE는 연결이 끊기면 자동으로 재연결을 시도한다고 했다.
근데 내가 아무리 포스트맨, 크롬으로 테스트를 해봐도 연결이 끊기면 그냥 끝나버렸다.
다시 재연결을 시도조차 하지 않는 것이다.
그래서 나는 이 로직을 내가 따로 추가를 해줘야하는건가? 싶어서 계속 찾아보고 코드를 뚫어져라 봤는데 못 찾았다.
이걸로 또 몇시간을 날려먹었다.
그런데 알고보니 이 작업은 백엔드에서 해주는 것이 아니고
프론트에서 SSE를 구현할 때 EventSource 라는 기능을 사용하는데
EventSource에서 자동으로 재연결을 해주는 것이었다.
ㅋ…