이론적인 부분은 이곳에서 확인 가능하다. https://dean83.tistory.com/411
단일서버일 경우 WebSocket에 대한 정보를 Map 이나 ConcurrentMap을 통해 저장하지만, 서버가 다수일 경우 Redis를 이용한다.
또한, 별도로 DB 에 데이터를 저장해야 되는 부분이 반드시 생기므로, Entity, Service, Repository를 구현해야 한다.
중요한 것은, WebServerHandler 구현체에서 바로 Repository를 주입받아 접근하지 않고, Service를 주입받아 활용한다.
여기서는 DB 에 저장 / 읽기 부분은 제외하고 정리하며, 대용량 및 빠른 경우를 상정하여 ProtoBuf를 이용하였다.
- 실제로 ProtoBuf를 이용하는 경우는 대용량, 초고속일 경우를 제외하고는 protobuf 버전 관리, 클라이언트 개발 난이도, 디버깅 등 여러 이유로 잘 쓰지 않는게 좋다
WebSocket
WebSocket 은 양방향 실시간 통신으로서, 지속적인 연결이 필요할 경우 사용한다.규모가 커질 경우 외부 브로커를 별도로 두어 같이 운용하는경우가 대부분이다.STOMP는 개발을 빨리 해야 할 때, 메
dean83.tistory.com
Build.gradle 추가
implementation 'org.springframework.boot:spring-boot-starter-websocket'
데이터 규격 정의 예 (Protobuf 기준)
syntax = "proto3";
option java_package = "com.example.ws.proto";
option java_outer_classname = "ChatProto";
message ChatMessage {
enum Type {
CHAT = 0;
DIRECT = 1;
HEARTBEAT = 2;
HEARTBEAT_ACK = 3;
}
Type type = 1;
int64 senderUserId = 2;
int64 targetUserId = 3;
string content = 4;
int64 timestamp = 5;
}
- 이 스키마 내용을 chat.proto 파일에 저장하고 빌드 하면 ChatProto.java 파일이 생성 된다.
- 파일위치는 src -> main 에 적당한 패키지를 생성하여 저장한다
- 빌드는 gradle build를 말한다.
- 해당 파일 안에 ChatMessage 클래스가 존재한다.
- 실제 ProtoBuf를 구성할 때에는 Envelop 으로 감싸고 그 안에 실제 메세지들을 별개로 구성하는게 좋다. 아래는 그 예이다.
- 여기서는 이 방식을 따르지 않고 위의 예를 따랐다.
syntax = "proto3";
message Envelope {
string version = 1;
oneof body {
ChatMessage chat = 2;
PingMessage ping = 3;
JoinMessage join = 4;
}
}
혹은,
syntax = "proto3";
message Envelope {
string version = 1;
anyof body {
ChatMessage chat = 2;
PingMessage ping = 3;
JoinMessage join = 4;
}
}
Protobuf Build.gradle 설정
plugins {
id 'com.google.protobuf' version '0.9.4'
}
dependencies {
implementation 'com.google.protobuf:protobuf-java:3.25.3'
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.3"
}
generateProtoTasks {
all().each { task ->
task.builtins {
java {}
}
}
}
}
HandShakeInterceptor (JWT 인증) 예
- WebSocket을 연결 하기 전, 클라이언트 인증을 수행하고, 해당 유저 정보를 attributes에 저장하여 추후 메세지 수신시 가져올 수 있게 한다.
- 아래 클라이언트 토큰 인증방식에 따라 코드가 변경된다. (현재 예는 앱 - 서버, 서버 - 서버 간 예)
- 특히 STOMP 를 이용하여 Channel Interceptor 가 인증을 담당할 경우, beforeHandshake에서는 true를 바로 리턴한다.
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes) {
String token = extractToken(request);
UserPrincipal user = authenticate(token);
attributes.put("userId", user.getUserId());
attributes.put("role", user.getRole());
return true;
}
@Override
public void afterHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Exception exception) {
}
}
클라이언트 토큰 인증 방식
- 클라이언트가 무엇이냐, 어떻게 구현되었느냐에 따라 차이를 보인다.
- 모바일, 서버 - 서버 등 브라우저가 아닌 환경에서는 위에 처럼 일반 HTTP 통신에서 토큰 검증하듯 진행하면 된다.
- 브라우저가 클라이언트인 경우 저 방식이 동작하지 않고, 다른 3방식 중 하나를 프론트엔드와 협의 후 정해야 한다.
- 이유 : 브라우저에서 websocket 통신히 헤더를 마음대로 제어하지 못함.
- 첫번째 방식 : Sec-WebSocket-Protocol 에 토큰을 넣는 방식
- 원래는 WebSocket의 사용 프로토콜을 명시하는 곳인데, 여기에 토큰을 넣어도 무방 하다고 한다.
- request.getHeaders().getFirst("Sec-WebSocket-Protocol") 을 통해 접근 후 적절히 파싱 필요
- 다른 정보도 포함되어 있으므로 토큰을 검출하여 파싱 해야함.
- DefaultHandshakeHandler 를 재정의 하는 클래스를 구현해야 함 (필수)
- 구체적인 방식은 추후 찾아보는걸로..
- 두번째 방식 : STOMP를 이용할 경우 Channel Interceptor를 이용하여 인증
- WebSocket 연결 미리 허용 후 처리
- 인증 후StompHeaderAccessor 에 인증정보 저장
- 세번째 방식 : 쿼리 파라메터로 토큰을 같이 넘겨 주기
- 토큰이 외부로 노출될 수 있어 비추천.
- 이 경우 아래와 같이 파싱하여 가져올 수 있다.
//STOMP 를 이용할 경우
@Component
@RequiredArgsConstructor
public class AuthChannelInterceptor implements ChannelInterceptor {
private final JwtProvider jwtProvider;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(
message,
StompHeaderAccessor.class
);
if (accessor == null) {
return message;
}
// CONNECT 프레임에서만 인증
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String authHeader =
accessor.getFirstNativeHeader("Authorization");
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
throw new IllegalArgumentException("Missing Authorization");
}
String token = authHeader.substring(7);
UserPrincipal user = jwtProvider.authenticate(token);
// ★ 여기서 Principal 설정
accessor.setUser(
new UsernamePasswordAuthenticationToken(
user,
null,
user.getAuthorities()
)
);
}
return message;
}
}
//파라메터로 토큰을 넘겨주었을때 파싱 방법 (비추천)
private String extractToken(ServerHttpRequest request) {
URI uri = request.getURI();
MultiValueMap<String, String> params =
UriComponentsBuilder.fromUri(uri)
.build()
.getQueryParams();
return params.getFirst("token");
}
WebSocket Config 설정
- 구현한 HandShakeInterceptor, WebSocketHandler를 등록하고, Origin 설정을 한다.
- 만일 STOMP를 쓰거나 구형브라우저, 혹은 공공기관 등에서 WebSocket이 안될경우, setAllowedOrigins("*") 다음에 .WithSockJS()를 쓰자.
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
private final AuthHandshakeInterceptor handshakeInterceptor;
private final ChatWebSocketHandler chatWebSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatWebSocketHandler, "/ws/chat")
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins("*");
}
}
WebSocketHandler 구현 예
- 채팅기반은 보통 text를 활용하고, 실시간, 고 가용성을 위해서는 binary를 많이 이용한다.
- WebSocketHandler 는 Bean 으로서, 여러 요청 처리시 동시에 수행될 수 있으므로 동시성 문제가 생길 수 있다.
- 주기적으로 hearbeat을 요청하여 응답이 없는 경우 해당 세션을 삭제한다.
- 특히 채팅 메세지가 많은 등 대규모일 경우, 클래스 변수 -> byte[] 로 serialize 하여 구현하는게 맞다.
- 다만 이 경우, 클라이언트 측에서도 개발 난이도가 같이 증가 한다.
- ProtoBuf 를 이용한다. (구글 바이너리 직렬화)
@Component
@Slf4j
public class ChatWebSocketHandler extends BinaryWebSocketHandler {
// 세션 인덱스
private final ConcurrentMap<String, WebSocketSession> sessions =
new ConcurrentHashMap<>();
// 유저 → 세션들
private final ConcurrentMap<Long, Set<WebSocketSession>> userSessions =
new ConcurrentHashMap<>();
// 세션별 send 락
private final ConcurrentMap<String, Object> sessionLocks =
new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> lastHeartbeat =
new ConcurrentHashMap<>();
/* ==========================
연결 수립
========================== */
@Override
public void afterConnectionEstablished(WebSocketSession session) {
Long userId = (Long) session.getAttributes().get("userId");
sessions.put(session.getId(), session);
sessionLocks.put(session.getId(), new Object());
lastHeartbeat.put(session.getId(), System.currentTimeMillis());
userSessions
.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet())
.add(session);
log.info("WS connected. session={}, user={}", session.getId(), userId);
}
/* ==========================
메시지 수신 (Binary)
========================== */
@Override
protected void handleBinaryMessage(
WebSocketSession session,
BinaryMessage message) throws Exception {
ChatMessage msg = ChatMessage.parseFrom(message.getPayload().array());
Long senderId = (Long) session.getAttributes().get("userId");
switch (msg.getType()) {
case HEARTBEAT_ACK -> {
lastHeartbeat.put(session.getId(), System.currentTimeMillis());
}
case CHAT -> broadcast(msg, senderId);
case DIRECT -> sendToUser(msg.getTargetUserId(), msg);
default -> log.warn("Unknown type");
}
}
/* ==========================
메시지 전송 로직
========================== */
private void broadcast(ChatMessage msg, Long senderId) {
byte[] bytes = msg.toByteArray();
BinaryMessage message = new BinaryMessage(bytes);
sessions.values().forEach(session -> safeSend(session, message));
}
private void sendToUser(Long userId, ChatMessage msg) {
Set<WebSocketSession> targets = userSessions.get(userId);
if (targets == null) return;
BinaryMessage message = new BinaryMessage(msg.toByteArray());
targets.forEach(session -> safeSend(session, message));
}
/* ==========================
thread-safe send
========================== */
private void safeSend(WebSocketSession session, BinaryMessage message) {
Object lock = sessionLocks.get(session.getId());
if (lock == null) return;
synchronized (lock) {
try {
if (session.isOpen()) {
session.sendMessage(message);
}
} catch (IOException e) {
log.error("Send failed", e);
}
}
}
/* ==========================
오류 처리
========================== */
@Override
public void handleTransportError(
WebSocketSession session,
Throwable exception) {
log.error("WS error session={}", session.getId(), exception);
closeSession(session);
}
/* ==========================
연결 종료
========================== */
@Override
public void afterConnectionClosed(
WebSocketSession session,
CloseStatus status) {
closeSession(session);
log.info("WS closed session={}, status={}", session.getId(), status);
}
/* ==========================
공통 정리 로직
========================== */
private void closeSession(WebSocketSession session) {
sessions.remove(session.getId());
sessionLocks.remove(session.getId());
Long userId = (Long) session.getAttributes().get("userId");
if (userId != null) {
Set<WebSocketSession> set = userSessions.get(userId);
if (set != null) {
set.remove(session);
if (set.isEmpty()) {
userSessions.remove(userId);
}
}
}
try {
if (session.isOpen()) {
session.close();
}
} catch (IOException ignored) {
}
}
@Scheduled(fixedDelay = 15_000)
public void heartbeatAndCleanup() {
long now = System.currentTimeMillis();
long timeout = 45_000;
ChatProto.ChatMessage heartbeat =
ChatProto.ChatMessage.newBuilder()
.setType(ChatProto.ChatMessage.Type.HEARTBEAT)
.setTimestamp(now)
.build();
BinaryMessage message =
new BinaryMessage(heartbeat.toByteArray());
sessions.values().forEach(session -> {
Long last = lastHeartbeat.get(session.getId());
if (last == null || now - last > timeout) {
log.warn("Zombie session detected: {}", session.getId());
closeSession(session);
return;
}
safeSend(session, message);
});
}
private void closeSession(WebSocketSession session) {
sessions.remove(session.getId());
sessionLocks.remove(session.getId());
lastHeartbeat.remove(session.getId());
Long userId = (Long) session.getAttributes().get("userId");
if (userId != null) {
Set<WebSocketSession> set = userSessions.get(userId);
if (set != null) {
set.remove(session);
if (set.isEmpty()) {
userSessions.remove(userId);
}
}
}
try {
if (session.isOpen()) {
session.close();
}
} catch (IOException ignored) {}
}
}
- WebSocketSession 을 이용하여 send를 하거나, getAttributes().get 을 통해 인증된 사용자 정보를 가져온다.
'Backend > SpringBoot' 카테고리의 다른 글
| Security 에서 Role 적용시 헷갈리는 부분 정리 (0) | 2026.01.28 |
|---|---|
| SSE 예제 (0) | 2026.01.14 |
| Outbox 패턴 모니터링 설정 (Prometheus+Grafana 를 위한) (1) | 2026.01.12 |
| Kafka-Outbox 전략 구현 예 (0) | 2026.01.12 |
| Kafka (+Kafka UI) 설정 및 연동 (오류 핸들링 포함) (0) | 2026.01.12 |