SpringBoot
SSE (Server-Sent Event) 구현
똑똑한망치
2024. 4. 10. 20:18
728x90
반응형
📝 이론
Client가 서버와 한 번 연결을 맺고 나면, Server에서 이벤트가 발생할 때마다 데이터를 전송받는 단방향 통신 방법이다.
- 장점
- HTTP 프로토콜만으로 사용할 수 있어 구현이 용이하다.
- 접속에 문제가 있는 경우 자동으로 연결을 재시도한다.
- 단점
- 클라이언트가 접속을 Close 해도 서버에서 감지하기 어렵다.
❓ 동작 과정 ❓
1️⃣ Client 측 - SSE Subscribe 요청
- 클라이언트가 서버의 이벤트를 구독하기 위한 요청 전송
- 이벤트의 mediaType은 text/event-stream이 표준 스펙으로 정해져 있음
2️⃣ Server 측 - Subscription에 대한 응답
- Response의 mediaType은 text/event-stream
- 서버는 동적으로 생성된 컨텐츠를 스트리밍하기 때문에, 본문의 크기를 미리 알 수 없으므로 Transfer-Encoding 헤더 값을 chunked로 설정해야 한다.
3️⃣ Server 측 - 이벤트 생성 및 전송
- 자신이 구독하고 있는 클라이언트에게 비동기적으로 데이터를 전송할 수 있다.
- 데이터는 UTF-8로 인코딩된 텍스트 데이터만 가능
- 각각의 이벤트는 한 개 이상의 name : value 로 구성된다.
✅ Spring에서 제공하는 SSE 서포팅 기술 -> SseEmitter❗
🔷코드 구현🔷
1) SSE 연결 상태를 볼 수 있는 Controller
NotificationController.java
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>(); // 1. 모든 Emitters를 저장하는 ConcurrentHashMap
// 메시지 알림
@GetMapping("/api/notification/subscribe")
public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails) {
Long userId = userDetails.getUser().getId();
SseEmitter sseEmitter = notificationService.subscribe(userId);
return sseEmitter;
}
}
SSE를 통한 알림 기능을 받을 사용자들을 등록하는 과정
꼭 별도의 Controller 를 생성할 필요는 없다.
프론트 쪽에서 사용자가 '로그인'을 하면, 해당 사용자를 sseEmitter에 등록되도록 하면 된다.
2) subscribe 기능과 notify할 기능 구현된 Service
NotificationService.java
@Service
@RequiredArgsConstructor
public class NotificationService {
private final PostRepository postRepository;
private final UserRepository userRepository;
// 메시지 알림
public SseEmitter subscribe(Long userId) {
// 1. 현재 클라이언트를 위한 sseEmitter 객체 생성
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
// 2. 연결
try {
sseEmitter.send(SseEmitter.event().name("connect"));
} catch (IOException e) {
e.printStackTrace();
}
// 3. 저장
NotificationController.sseEmitters.put(userId, sseEmitter);
// 4. 연결 종료 처리
sseEmitter.onCompletion(() -> NotificationController.sseEmitters.remove(userId)); // sseEmitter 연결이 완료될 경우
sseEmitter.onTimeout(() -> NotificationController.sseEmitters.remove(userId)); // sseEmitter 연결에 타임아웃이 발생할 경우
sseEmitter.onError((e) -> NotificationController.sseEmitters.remove(userId)); // sseEmitter 연결에 오류가 발생할 경우
return sseEmitter;
}
// 채팅 수신 알림 - receiver 에게
public void notifyMessage(String receiver) {
// 5. 수신자 정보 조회
User user = userRepository.findByNickname(receiver);
// 6. 수신자 정보로부터 id 값 추출
Long userId = user.getId();
// 7. Map 에서 userId 로 사용자 검색
if (NotificationController.sseEmitters.containsKey(userId)) {
SseEmitter sseEmitterReceiver = NotificationController.sseEmitters.get(userId);
// 8. 알림 메시지 전송 및 해체
try {
sseEmitterReceiver.send(SseEmitter.event().name("addMessage").data("메시지가 왔습니다."));
} catch (Exception e) {
NotificationController.sseEmitters.remove(userId);
}
}
}
// 댓글 알림 - 게시글 작성자 에게
public void notifyComment(Long postId) {
Post post = postRepository.findById(postId).orElseThrow(
() -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
);
Long userId = post.getUser().getId();
if (NotificationController.sseEmitters.containsKey(userId)) {
SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
try {
sseEmitter.send(SseEmitter.event().name("addComment").data("댓글이 달렸습니다."));
} catch (Exception e) {
NotificationController.sseEmitters.remove(userId);
}
}
}
}
3) 댓글 작성 시 알림이 가도록 하고 싶다면 CommentController에서 notify하는 코드 한 줄만 추가해주면 된다.
CommentController.java
@RestController
@RequestMapping("/api/post/{category}/{postId}")
@RequiredArgsConstructor
public class CommentController {
private final CommentService commentService;
private final NotificationService notificationService;
@PostMapping
public ResponseEntity<CommentResponseDto> createComment(@PathVariable Long category, @PathVariable Long postId, @RequestBody CommentRequestDto requestDto, @AuthenticationPrincipal UserDetailsImpl userDetails) {
CommentResponseDto response = commentService.createComment(category, postId, requestDto, userDetails.getUser());
notificationService.notifyComment(postId); // 댓글 알림 - 게시글 작성자 에게
return new ResponseEntity<>(response, HttpStatus.OK);
}
...
}
4) 알림 메시지만 전송하는 것이 아니라, 다른 정보를 포함하여 알림을 보내고 싶을 때
NotificationController.java
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
// 메시지 알림
@GetMapping("/api/notification/subscribe")
public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails) {
Long userId = userDetails.getUser().getId();
SseEmitter sseEmitter = notificationService.subscribe(userId);
return sseEmitter;
}
// 알림 삭제
@DeleteMapping("/api/notification/delete/{id}")
public MsgResponseDto deleteNotification(@PathVariable Long id) throws IOException {
return notificationService.deleteNotification(id);
}
}
- 메시지 알림(SSE 연결, 구독)와 알림 삭제 API만 생성
- 알림 삭제 기능 구현을 위해 SSE를 통한 실시간 알림 전송 후 DB에 저장
- 알림 저장을 위한 Notification Entity와 repository 생성
- 알림 삭제 기능 구현을 위해 SSE를 통한 실시간 알림 전송 후 DB에 저장
Notification.java
@Entity
@Setter
@Getter
@Table(name = "notification")
@NoArgsConstructor
public class Notification {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String sender;
private LocalDateTime createdAt;
private String contents; // 채팅 메시지 내용 또는 댓글 내용
private String roomId;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "post_id")
private Post post;
}
NotificationRepository.java
@Repository
public interface NotificationRepository extends JpaRepository<Notification, Long> {
Optional<Notification> findById(Long id);
}
NotificationService.java
@Service
@RequiredArgsConstructor
public class NotificationService {
private final PostRepository postRepository;
private final UserRepository userRepository;
private final NotificationRepository notificationRepository;
private final CommentRepository commentRepository;
private final MessageRepository messageRepository;
private final MessageRoomRepository messageRoomRepository;
private static Map<Long, Integer> notificationCounts = new HashMap<>(); // 알림 개수 저장
// 메시지 알림
public SseEmitter subscribe(Long userId) {
// 현재 클라이언트를 위한 sseEmitter 생성
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
try {
// 연결
sseEmitter.send(SseEmitter.event().name("connect"));
} catch (IOException e) {
e.printStackTrace();
}
// user 의 pk 값을 key 값으로 해서 sseEmitter 를 저장
NotificationController.sseEmitters.put(userId, sseEmitter);
sseEmitter.onCompletion(() -> NotificationController.sseEmitters.remove(userId));
sseEmitter.onTimeout(() -> NotificationController.sseEmitters.remove(userId));
sseEmitter.onError((e) -> NotificationController.sseEmitters.remove(userId));
return sseEmitter;
}
// 메시지 알림 - receiver 에게
public void notifyMessage(String roomId, String receiver, String sender) {
MessageRoom messageRoom = messageRoomRepository.findByRoomId(roomId);
Post post = postRepository.findById(messageRoom.getPost().getId()).orElseThrow(
() -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
);
User user = userRepository.findByNickname(receiver);
User userSender = userRepository.findByNickname(sender);
Message receiveMessage = messageRepository.findFirstBySenderOrderByCreatedAtDesc(userSender.getNickname()).orElseThrow(
() -> new IllegalArgumentException("메시지를 찾을 수 없습니다.")
);
Long userId = user.getId();
if (NotificationController.sseEmitters.containsKey(userId)) {
SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
try {
Map<String, String> eventData = new HashMap<>();
eventData.put("message", "메시지가 왔습니다.");
eventData.put("sender", receiveMessage.getSender()); // 메시지 보낸자
eventData.put("createdAt", receiveMessage.getCreatedAt().toString()); // 메시지를 보낸 시간
eventData.put("contents", receiveMessage.getMessage()); // 메시지 내용
sseEmitter.send(SseEmitter.event().name("addMessage").data(eventData));
// DB 저장
Notification notification = new Notification();
notification.setSender(receiveMessage.getSender());
notification.setCreatedAt(receiveMessage.getCreatedAt());
notification.setContents(receiveMessage.getMessage());
notification.setRoomId(messageRoom.getRoomId());
notification.setPost(post); // post 필드 설정
notificationRepository.save(notification);
// 알림 개수 증가
notificationCounts.put(userId, notificationCounts.getOrDefault(userId, 0) + 1);
// 현재 알림 개수 전송
sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));
} catch (Exception e) {
NotificationController.sseEmitters.remove(userId);
}
}
}
// 댓글 알림 - 게시글 작성자 에게
public void notifyComment(Long postId) {
Post post = postRepository.findById(postId).orElseThrow(
() -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
);
Comment receiveComment = commentRepository.findFirstByPostIdOrderByCreatedAtDesc(post.getId()).orElseThrow(
() -> new IllegalArgumentException("댓글을 찾을 수 없습니다.")
);
Long userId = post.getUser().getId();
if (NotificationController.sseEmitters.containsKey(userId)) {
SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
try {
Map<String, String> eventData = new HashMap<>();
eventData.put("message", "댓글이 달렸습니다.");
eventData.put("sender", receiveComment.getUser().getNickname()); // 댓글 작성자
eventData.put("createdAt", receiveComment.getCreatedAt().toString()); // 댓글이 달린 시간
eventData.put("contents", receiveComment.getComment()); // 댓글 내용
sseEmitter.send(SseEmitter.event().name("addComment").data(eventData));
// DB 저장
Notification notification = new Notification();
notification.setSender(receiveComment.getUser().getNickname());
notification.setCreatedAt(receiveComment.getCreatedAt());
notification.setContents(receiveComment.getComment());
notification.setPost(post); // post 필드 설정
notificationRepository.save(notification);
// 알림 개수 증가
notificationCounts.put(userId, notificationCounts.getOrDefault(userId, 0) + 1);
// 현재 알림 개수 전송
sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));
} catch (IOException e) {
NotificationController.sseEmitters.remove(userId);
}
}
}
// 알림 삭제
public MsgResponseDto deleteNotification(Long id) throws IOException {
Notification notification = notificationRepository.findById(id).orElseThrow(
() -> new IllegalArgumentException("알림을 찾을 수 없습니다.")
);
Long userId = notification.getPost().getUser().getId();
notificationRepository.delete(notification);
// 알림 개수 감소
if (notificationCounts.containsKey(userId)) {
int currentCount = notificationCounts.get(userId);
if (currentCount > 0) {
notificationCounts.put(userId, currentCount - 1);
}
}
// 현재 알림 개수 전송
SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));
return new MsgResponseDto("알림이 삭제되었습니다.", HttpStatus.OK.value());
}
}
- 알림 메시지 커스터마이징
- Map에 원하는 정보만 put 하여 다수의 데이터 저장
반응형