SpringBoot

SSE (Server-Sent Event) 구현

똑똑한망치 2024. 4. 10. 20:18
728x90
반응형

📝 이론

 

 

Client가 서버와 한 번 연결을 맺고 나면, Server에서 이벤트가 발생할 때마다 데이터를 전송받는 단방향 통신 방법이다. 

 

  • 장점
    • HTTP 프로토콜만으로 사용할 수 있어 구현이 용이하다.
    • 접속에 문제가 있는 경우 자동으로 연결을 재시도한다.
  • 단점
    • 클라이언트가 접속을 Close 해도 서버에서 감지하기 어렵다.

 

 

❓ 동작 과정 ❓

 

1️⃣ Client 측 - SSE Subscribe 요청

  • 클라이언트가 서버의 이벤트를 구독하기 위한 요청 전송
  • 이벤트의 mediaType은 text/event-stream이 표준 스펙으로 정해져 있음

 

2️⃣ Server 측 - Subscription에 대한 응답

  • Response의 mediaType은 text/event-stream
  • 서버는 동적으로 생성된 컨텐츠를 스트리밍하기 때문에, 본문의 크기를 미리 알 수 없으므로 Transfer-Encoding 헤더 값을 chunked로 설정해야 한다.

 

3️⃣ Server 측 - 이벤트 생성 및 전송

  • 자신이 구독하고 있는 클라이언트에게 비동기적으로 데이터를 전송할 수 있다.
    • 데이터는 UTF-8로 인코딩된 텍스트 데이터만 가능
    • 각각의 이벤트는 한 개 이상의 name : value 로 구성된다.

 

✅ Spring에서 제공하는 SSE 서포팅 기술 ->  SseEmitter❗

 

 

 

 

🔷코드 구현🔷

1) SSE 연결 상태를 볼 수 있는 Controller

 

NotificationController.java

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;
    public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();		// 1. 모든 Emitters를 저장하는 ConcurrentHashMap

    // 메시지 알림
    @GetMapping("/api/notification/subscribe")
    public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails) {
        Long userId = userDetails.getUser().getId();
        SseEmitter sseEmitter = notificationService.subscribe(userId);

        return sseEmitter;
    }
}

 

SSE를 통한 알림 기능을 받을 사용자들을 등록하는 과정

 

꼭 별도의 Controller 를 생성할 필요는 없다.

프론트 쪽에서 사용자가 '로그인'을 하면, 해당 사용자를 sseEmitter에 등록되도록 하면 된다.

 

 

2) subscribe 기능과 notify할 기능 구현된 Service 

 

NotificationService.java

@Service
@RequiredArgsConstructor
public class NotificationService {
    private final PostRepository postRepository;
    private final UserRepository userRepository;

    // 메시지 알림
    public SseEmitter subscribe(Long userId) {
    
        // 1. 현재 클라이언트를 위한 sseEmitter 객체 생성
        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
        
        // 2. 연결
        try {
            sseEmitter.send(SseEmitter.event().name("connect"));
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 3. 저장
        NotificationController.sseEmitters.put(userId, sseEmitter);

		// 4. 연결 종료 처리
        sseEmitter.onCompletion(() -> NotificationController.sseEmitters.remove(userId));	// sseEmitter 연결이 완료될 경우
        sseEmitter.onTimeout(() -> NotificationController.sseEmitters.remove(userId));		// sseEmitter 연결에 타임아웃이 발생할 경우
        sseEmitter.onError((e) -> NotificationController.sseEmitters.remove(userId));		// sseEmitter 연결에 오류가 발생할 경우

        return sseEmitter;
    }

    // 채팅 수신 알림 - receiver 에게
    public void notifyMessage(String receiver) {
    	// 5. 수신자 정보 조회
        User user = userRepository.findByNickname(receiver);

		// 6. 수신자 정보로부터 id 값 추출
        Long userId = user.getId();

		// 7. Map 에서 userId 로 사용자 검색
        if (NotificationController.sseEmitters.containsKey(userId)) {		
            SseEmitter sseEmitterReceiver = NotificationController.sseEmitters.get(userId);
            // 8. 알림 메시지 전송 및 해체
            try {
                sseEmitterReceiver.send(SseEmitter.event().name("addMessage").data("메시지가 왔습니다."));
            } catch (Exception e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }
    
    // 댓글 알림 - 게시글 작성자 에게
    public void notifyComment(Long postId) {
        Post post = postRepository.findById(postId).orElseThrow(
                () -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
        );

        Long userId = post.getUser().getId();

        if (NotificationController.sseEmitters.containsKey(userId)) {
            SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
            try {
                sseEmitter.send(SseEmitter.event().name("addComment").data("댓글이 달렸습니다."));
            } catch (Exception e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }
}

 

 

 

3) 댓글 작성 시 알림이 가도록 하고 싶다면 CommentController에서 notify하는 코드 한 줄만 추가해주면 된다.

 

CommentController.java

@RestController
@RequestMapping("/api/post/{category}/{postId}")
@RequiredArgsConstructor
public class CommentController {

    private final CommentService commentService;
    private final NotificationService notificationService;

    @PostMapping
    public ResponseEntity<CommentResponseDto> createComment(@PathVariable Long category, @PathVariable Long postId, @RequestBody CommentRequestDto requestDto, @AuthenticationPrincipal UserDetailsImpl userDetails) {
        CommentResponseDto response = commentService.createComment(category, postId, requestDto, userDetails.getUser());
        
        notificationService.notifyComment(postId);		// 댓글 알림 - 게시글 작성자 에게
        
        return new ResponseEntity<>(response, HttpStatus.OK);
    }
    
    ...
    
}

 

 

 

4) 알림 메시지만 전송하는 것이 아니라, 다른 정보를 포함하여 알림을 보내고 싶을 때

 

NotificationController.java

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;
    public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

    // 메시지 알림
    @GetMapping("/api/notification/subscribe")
    public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails) {
        Long userId = userDetails.getUser().getId();
        SseEmitter sseEmitter = notificationService.subscribe(userId);

        return sseEmitter;
    }

    // 알림 삭제
    @DeleteMapping("/api/notification/delete/{id}")
    public MsgResponseDto deleteNotification(@PathVariable Long id) throws IOException {
        return notificationService.deleteNotification(id);
    }
}

 

  • 메시지 알림(SSE 연결, 구독)와 알림 삭제 API만 생성
    • 알림 삭제 기능 구현을 위해 SSE를 통한 실시간 알림 전송 후 DB에 저장
      • 알림 저장을 위한 Notification Entity와 repository 생성

 

Notification.java

@Entity
@Setter
@Getter
@Table(name = "notification")
@NoArgsConstructor
public class Notification {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String sender;

    private LocalDateTime createdAt;

    private String contents;        // 채팅 메시지 내용 또는 댓글 내용

    private String roomId;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "post_id")
    private Post post;
}

 

 

 

NotificationRepository.java

@Repository
public interface NotificationRepository extends JpaRepository<Notification, Long> {
    Optional<Notification> findById(Long id);
}

 

 

NotificationService.java

@Service
@RequiredArgsConstructor
public class NotificationService {
    private final PostRepository postRepository;
    private final UserRepository userRepository;
    private final NotificationRepository notificationRepository;
    private final CommentRepository commentRepository;
    private final MessageRepository messageRepository;
    private final MessageRoomRepository messageRoomRepository;
    private static Map<Long, Integer> notificationCounts = new HashMap<>();     // 알림 개수 저장

    // 메시지 알림
    public SseEmitter subscribe(Long userId) {
        // 현재 클라이언트를 위한 sseEmitter 생성
        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
        try {
            // 연결
            sseEmitter.send(SseEmitter.event().name("connect"));
        } catch (IOException e) {
            e.printStackTrace();
        }

        // user 의 pk 값을 key 값으로 해서 sseEmitter 를 저장
        NotificationController.sseEmitters.put(userId, sseEmitter);

        sseEmitter.onCompletion(() -> NotificationController.sseEmitters.remove(userId));
        sseEmitter.onTimeout(() -> NotificationController.sseEmitters.remove(userId));
        sseEmitter.onError((e) -> NotificationController.sseEmitters.remove(userId));

        return sseEmitter;
    }

    // 메시지 알림 - receiver 에게
    public void notifyMessage(String roomId, String receiver, String sender) {
        MessageRoom messageRoom = messageRoomRepository.findByRoomId(roomId);

        Post post = postRepository.findById(messageRoom.getPost().getId()).orElseThrow(
                () -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
        );

        User user = userRepository.findByNickname(receiver);

        User userSender = userRepository.findByNickname(sender);

        Message receiveMessage = messageRepository.findFirstBySenderOrderByCreatedAtDesc(userSender.getNickname()).orElseThrow(
                () -> new IllegalArgumentException("메시지를 찾을 수 없습니다.")
        );

        Long userId = user.getId();

        if (NotificationController.sseEmitters.containsKey(userId)) {
            SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
            try {
                Map<String, String> eventData = new HashMap<>();
                eventData.put("message", "메시지가 왔습니다.");
                eventData.put("sender", receiveMessage.getSender());                    // 메시지 보낸자
                eventData.put("createdAt", receiveMessage.getCreatedAt().toString());   // 메시지를 보낸 시간
                eventData.put("contents", receiveMessage.getMessage());                 // 메시지 내용

                sseEmitter.send(SseEmitter.event().name("addMessage").data(eventData));

                // DB 저장
                Notification notification = new Notification();
                notification.setSender(receiveMessage.getSender());
                notification.setCreatedAt(receiveMessage.getCreatedAt());
                notification.setContents(receiveMessage.getMessage());
                notification.setRoomId(messageRoom.getRoomId());
                notification.setPost(post);         // post 필드 설정
                notificationRepository.save(notification);

                // 알림 개수 증가
                notificationCounts.put(userId, notificationCounts.getOrDefault(userId, 0) + 1);

                // 현재 알림 개수 전송
                sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));

            } catch (Exception e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }
    
    // 댓글 알림 - 게시글 작성자 에게
    public void notifyComment(Long postId) {
        Post post = postRepository.findById(postId).orElseThrow(
                () -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
        );

        Comment receiveComment = commentRepository.findFirstByPostIdOrderByCreatedAtDesc(post.getId()).orElseThrow(
                () -> new IllegalArgumentException("댓글을 찾을 수 없습니다.")
        );

        Long userId = post.getUser().getId();

        if (NotificationController.sseEmitters.containsKey(userId)) {
            SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
            try {
                Map<String, String> eventData = new HashMap<>();
                eventData.put("message", "댓글이 달렸습니다.");
                eventData.put("sender", receiveComment.getUser().getNickname());        // 댓글 작성자
                eventData.put("createdAt", receiveComment.getCreatedAt().toString());   // 댓글이 달린 시간
                eventData.put("contents", receiveComment.getComment());                 // 댓글 내용

                sseEmitter.send(SseEmitter.event().name("addComment").data(eventData));

                // DB 저장
                Notification notification = new Notification();
                notification.setSender(receiveComment.getUser().getNickname());
                notification.setCreatedAt(receiveComment.getCreatedAt());
                notification.setContents(receiveComment.getComment());
                notification.setPost(post);         // post 필드 설정
                notificationRepository.save(notification);

                // 알림 개수 증가
                notificationCounts.put(userId, notificationCounts.getOrDefault(userId, 0) + 1);

                // 현재 알림 개수 전송
                sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));

            } catch (IOException e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }

    // 알림 삭제
    public MsgResponseDto deleteNotification(Long id) throws IOException {
        Notification notification = notificationRepository.findById(id).orElseThrow(
                () -> new IllegalArgumentException("알림을 찾을 수 없습니다.")
        );

        Long userId = notification.getPost().getUser().getId();

        notificationRepository.delete(notification);

        // 알림 개수 감소
        if (notificationCounts.containsKey(userId)) {
            int currentCount = notificationCounts.get(userId);
            if (currentCount > 0) {
                notificationCounts.put(userId, currentCount - 1);
            }
        }
        
        // 현재 알림 개수 전송
        SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
        sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));

        return new MsgResponseDto("알림이 삭제되었습니다.", HttpStatus.OK.value());
    }
}

 

  • 알림 메시지 커스터마이징
    • Map에 원하는 정보만 put 하여 다수의 데이터 저장

 

반응형