제가 진행하는 프로젝트에사 실시간 알림 기능을 계획하고 구현을 하게되었습니다. 이때, 실시간 알림 기능은 클라이언트가 서버에게 요청을 보내는 기존 기능과는 달리, 서버가 클라이언트에게 데이터를 보내면 클라이언트가 이를 인지하고 알림을 띄워줘야하는 동작을 해야했습니다.
하지만 HTTP 프로토콜의 주요 특징은 비연결성 입니다. 따라서 서버가 클라이언트에 데이터 전송시 인지해야할 경우, 서버가 전송하고 싶어도 해당 클라이언트와 지속적으로 연결이 되어있지 않기 때문에 보낼 수 없는 상황이 발생하게 되는 것입니다.
이를 해결하는 방식으로 Polling, Long Polling, WebSocket, SSE 총 네 가지가 존재합니다. 그래서 SSE에 대해 설명하기 전 SSE를 제외한 3가지를 간단하게 설명하고 넘어가고자 합니다.
🏷️ 서버의 event를 클라이언트로 보내는 4가지 방법
✅ Polling
클라이언트가 평범한 요청를 서버로 계속 보내서 이벤트 내용을 전달받는 방식입니다. 가장 쉬운방법이지만 클라이언트가 계속적으로 요청를 보내기 때문에 클라이언가 많아지면 서버의 부담이 급증하게 됩니다. Request Connection을 맺고 끊는것 자체가 부담이 많은 방식이고, 클라이언트에서 실시간정도의 빠른 응답을 기대하기도 어렵습니다. 단점으로는 Polling은 Http 오버헤드가 발생하는 것입니다.
Http Overhead
정보의 신뢰성 판단을 위한, 보내지는 헤더 같은 정보 때문에 오히려 데이터량이나 처리시간이 증가하는걸 말합니다.
✅ Long Polling
Polling에서 추가로 서버 측에서 접속을 열어두는 시간을 길게하는 방식입니다. 즉, 클라이언트에서 서버로 http request를 보내면, 서버에서 해당 클라이언트로 전달할 이벤트를 기다리다가 이벤트가 생기면 응답 메시지를 전달하면서 연결이 종료하게 됩니다. 그 후에 클라이언트에서는 곧바로 다시 요청를 날려서 서버의 다음 이벤트를 기다리게 되는 방식입니다.
Polling 방식보다는 서버의 부담이 줄겠지만 클라이언트로 보내는 이벤트들의 시간간격이 좁다면 Polling과 별 차이가 없습니다. 단점으로는 다수의 클라이언트에게 동시에 이벤트가 발생될 경우에는 곧바로 다수의 클라이언트가 서버로 접속을 시도하면서 서버의 부담이 급증하게 됩니다.
✅ WebSocket
양방향 채널을 이용해 채팅방 처럼 양방향 통신이 가능합니다. 기존 Http요청 응답 방식은 요청한 그 클라이언트에만 응답이 가능했는데, ws 프로토콜을 통해 웹소켓 포트에 접속해 있는 모든 클라이언트에게 이벤트 방식으로 응답하게 됩니다.
최초 접속이 일반 요청를 통해 Handshaking과정을 통해 이루어 지기 떄문에, 기존의 80, 443 포트로 접속을 하므로 추가로 방화벽을 열지 않고도 양방향 통신이 가능하고, Http 규격인 CORS적용이나 인증등의 과정을 기존과 동일하게 가저갈 수 있는것이 장점입니다. 단, WebSocket 프로토콜을 처리하기 위해 전이중 연결과 새로운 웹소켓 서버가 필요합다.
SSE(Server Sent Event)
HTML5 표준안이며 어느정도 웹소켓의 역할을 하면서 더 가볍습니다. 양방향이 아닌 서버에서 클라이언로 단방향으로 데이터를 받는 방식입니다. 별도의 프로토콜을 사용하지 않고 HTTP 프로토콜만으로 사용할 수 있기 때문에 구현이 용이합니다.
단점으로는 접속에 문제가 있으면 자동으로 재연결 시도하지만, 클라이언트가 close해도 서버에서 감지하기 어렵습니다.
✔️ SSE 선택의 이유
구현하고자하는 실시간 알림 기능은 서버에서 클라이언트로 데이터를 전송해야하며, 클라이언트는 서버로 데이터를 전송할 필요가 없습니다. 또한 실시간으로 데이터를 주고 받아야함니다.
그러므로 Polling과 Long Polling처럼 일정 시간 마다 받거나 WebSocket처럼 양방향이 필요없으므로 SSE를 선택하게 되었습니다.
🏷️ SSE 통신 과정
1. 클라이언트에서 SSE Subscribe 요청
클라이언트측에서 우선 서버의 이벤트 구독을 위한 요청을 보내야 합니다. 이벤트의 미디어 타입은 text/event-stream이 표준으로 정해져있습니다.
2. 서버에서 Subscription에 대한 응답
Response의 미디어 타입은 text/event-stream입니다. 이때 Transfer-Encoding 헤더의 값을 Chunked로 설정하는데, 왜냐하면 서버는 동적으로 생성된 컨텐츠를 스트리밍 하기 때문에 본문의 크기를 미리 알 수 없기 때문입니다.
3. 서버에서 이벤트 전달
클라이언트에서 구독를 하면, 서버는 해당 클라이언트에게 비동기적으로 데이터를 전송할 수 있습니다. 이때, 데이터는 utf-8로 인코딩된 텍스트 데이터만 가능합니다. 서로 다른 이벤트는 \n\n 로 구분되며, 각각의 이벤트는 한 개 이상의 name:value로 구성됩니다. (이벤트 안의 내용은 \n으로 구분됩니다.)
🏷️ Spring에서 SSE
✅ Entity 작성
@Getter
@Setter
@Entity
public class Alarm {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
// 알람 받을 사람
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "user_id")
private UserAccount userAccount;
@Enumerated(EnumType.STRING)
private AlarmType alarmType; // 알람 타임 enum으로 알람 타입 지정
@Column(name = "from_user_id")
private String fromUserId; // 보내는 사람
@Column(name = "target_id")
private Long targetId; // 알람 타입에 맞춰서 해당하는 data의 id
@Column(name = "data")
private String data; // 추가 적으로 전송할 데이터를 담는 필드 (ex. front에 띄울때 필요한 data)
@Column(name = "created_at")
private LocalDateTime createdAt;
// 생략
}
✅ EmitterRepository 작성
저는 알람을 유저 별로 관리를 하고 싶기 때문에 Repository를 만들었습니다.
@Slf4j
@Repository
public class EmitterRepository {
// 유저 별 emitter 관리를 위한 레포지토리
private Map<String, SseEmitter> emitterMap = new HashMap<>();
// 유저 id 별 emitter 저장
public SseEmitter save(String userId, SseEmitter sseEmitter) {
final String key = getKey(userId);
emitterMap.put(key, sseEmitter);
log.info("Set sseEmitter {}", userId);
return sseEmitter;
}
// 유저 id로 emitter 찾기
public Optional<SseEmitter> get(String userId) {
final String key = getKey(userId);
SseEmitter emitter = emitterMap.get(key);
if (emitter == null) {
log.warn("No SseEmitter found for user: {}", userId);
}
return Optional.ofNullable(emitter);
}
// emitter 삭제
public void delete(String userId){
emitterMap.remove(getKey(userId));
}
// key 생성
public String getKey(String userId) {
return "Emitter:UID:" + userId;
}
}
SseEmitter
SseEmitter란 Spring framework에서 sse기술을 적용해 관리하기 위하여 지원해주는 객체로써 해당 객체를 사용함으로써 개발자는 손쉽게 Sse기술을 적용할 수 있습니다.
해당 객체가 상속받은 ResponseBodyEmitter를 보면 하나 이상의 객체가 응답에 기록되는 비동기 요청을 처리하기 위한 컨트롤러 반환 타입이라고 명시가 되어있습니다.
이런 객체를 상속받아 구현된 객체인 만큼 비동기를 지원하며, Spring framework 4.2부터 사용가능합니다.
✅ Service 작성
@Slf4j
@RequiredArgsConstructor
@Service
public class AlarmService {
private final EmitterRepository emitterRepository;
private final ServiceUtils serviceUtils;
private final static Long DEFAULT_TIME_OUT = 60L * 1000 * 60;
private final static String ALARM_NAME = "alarm";
// 알람 sse connect
public SseEmitter connectAlarm(String userId) {
// 유저 검사
serviceUtils.getUserAccountOrException(userId);
// sse 셋팅 및 연결
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIME_OUT);
emitterRepository.save(userId, sseEmitter);
// 완료된 경우
sseEmitter.onCompletion(() -> emitterRepository.delete(userId));
// 타임 아웃된 경우
sseEmitter.onTimeout(() -> emitterRepository.delete(userId));
try {
// SseEmitter에 ALARM_NAME으로 name을 지정하여 보냅니다.
sseEmitter.send(SseEmitter.event().id("").name(ALARM_NAME).data("connect completed"));
} catch (IOException e) {
// 오류시 실행될 코드 입력
}
return sseEmitter;
}
// 알람에 저장 되었을 때, 저장됨을 알려주는 메소드
public void alarmSend(Long alarmId, String userId) {
// Repository에 값이 있을 경우 값을 가져옴
emitterRepository.get(userId).ifPresentOrElse(sseEmitter -> {
try {
// 반환 값이 없을 경우 name을 정하여 전송
sseEmitter.send(
SseEmitter.event().id(alarmId.toString()).name(ALARM_NAME).data("new alarm")
);
} catch (IOException e) {
// 오류 발생시 Repository에서 값 제거
emitterRepository.delete(userId);
// 추가적 오류시 실행 코드 입력
}
}, () -> log.info("유저 {}에 대한 emitter를 찾지 못했습니다.", userId));
}
}
✅ Controller 작성
@RequiredArgsConstructor
@RequestMapping("/api/main/alarm")
@RestController
public class AlarmController {
private final AlarmService alarmService;
// 알람 sse 구독
@GetMapping("/sub")
public SseEmitter subscribeAlarm(
Authentication authentication
) {
return alarmService.connectAlarm(authentication.getName());
}
// 알람 삭제
@DeleteMapping
public Response<Void> deleteAlarm(
Authentication authentication,
@RequestParam Long targetId, // 쿼리 파라미터로 받음
@RequestParam AlarmType alarmType
) {
alarmService.deleteAlarm(authentication.getName(), targetId,alarmType);
return Response.success();
}
}
✅ 작성 된걸 토대로 사용
// 알람에 저장
Alarm alarm = alarmRepository.save(
Alarm.of(
toUserId,
AlarmType.NEW_COMMENT_ON_POST,
fromUserId,
targetId,
data
)
)
// SseEmitter에 전송
alarmService.alarmSend(alarm.getId(), userId));
✅ JWT 셋팅
@Slf4j
@RequiredArgsConstructor
public class JwtTokenFilter extends OncePerRequestFilter {
private final String key;
private final UserAccountService userAccountService;
private final static List<String> TOKEN_IN_PARAM_URLS = List.of("/api/main/alarm/sub");
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
final String token;
try {
if (TOKEN_IN_PARAM_URLS.contains(request.getRequestURI())) {
// SSE같이 쿼리 파라미터에 토큰이 있는 경우
log.info("요청 {}의 쿼리 파리미터를 체크", request.getRequestURI());
token = request.getQueryString().split("=")[1].trim();
} else {
// get header
final String header = request.getHeader(HttpHeaders.AUTHORIZATION);
// header가 null이거나 토큰 값이 아닌 경우
if (header == null || !header.startsWith("Bearer ")) {
log.error("헤더를 얻는 과정에서 에러 발생. 헤더가 null이거나 맞지 않습니다. URL: {}",
request.getRequestURL());
filterChain.doFilter(request, response);
return;
}
// Bearer 제외 후 문자열로 가져옴
token = header.split(" ")[1].trim();
}
// 토큰이 만료되었는지 확인
if (JwtTokenUtils.isExpired(token, key)) {
log.error("키가 만료되었습니다.");
filterChain.doFilter(request, response);
return;
}
String userId = JwtTokenUtils.getUserId(token, key);
UserAccountDto userAccountDto = UserAccountDto.fromEntity(userAccountService.loadUserByUserId(userId));
// 인증 객체 생성 및 값 입력
UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(
userAccountDto,
null,
userAccountDto.getAuthorities()
);
// 추가적인 사용자 세부 정보 추가 (IP 등을 추가 할 수 있음)
authentication.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
// 사용자 정보를 securityContextHolder에 추가
SecurityContextHolder.getContext().setAuthentication(authentication);
} catch (RuntimeException e) {
log.error("검증 과정에서 에러 발생 : {}", e.toString());
filterChain.doFilter(request, response);
return;
}
filterChain.doFilter(request, response);
}
}
🏷️ Front에서 SSE
// 토큰 가져옴
const token = authStore.token || localStorage.getItem('token');
// 토큰이 없는 경우
if (!token) {
console.error("JWT token이 없습니다.");
return;
}
// EventSource를 통해 주소입력
const eventSource = new EventSource(`/api/main/alarm/sub?token=${token}`);
alarmEvent.value = eventSource;
// open을 통해 연결
eventSource.addEventListener('open', () => {
console.log('연결 완료');
});
// name이 alarm일때 데이터를 받음
eventSource.addEventListener('alarm', (event) => {
console.log('받은 알람 Event: ', event.data);
getAlarmList(); // 알람 데이터 갱신
navAlarmStore.getTopNavAlarmList(); // 네비의 알람 데이터 갱신
});
// error가 생길 경우 EventSource 닫음
eventSource.addEventListener('error', (event) => {
console.log("EventSource error state: ", event.target.readyState);
if (event.target.readyState === EventSource.CLOSED) {
console.log('EventSource Closed');
}
eventSource.close();
})
📖 Reference
'코딩 공부 > web & Java' 카테고리의 다른 글
[Spring / Redis] Spring에서 Redis 사용 (0) | 2024.10.10 |
---|---|
[Spring / WebSocket] WebSocket와 STOMP (0) | 2024.10.05 |
[Spring / Redis] Redis LocalDateTime 역직렬화 오류 (2) | 2024.10.03 |
[Spring / Vue] Spring과 Vue를 연동시 주소로 입력하면 오류나는 문제 (1) | 2024.10.03 |
[Java / TDD] 테스트 케이스 작성시 발생한 영속성 관련 오류 (0) | 2024.08.28 |