Backend/SpringBoot

순수 WebSocket (+Protobuf)

Dean83 2026. 1. 14. 10:47

이론적인 부분은 이곳에서 확인 가능하다. https://dean83.tistory.com/411

단일서버일 경우 WebSocket에 대한 정보를 Map 이나 ConcurrentMap을 통해 저장하지만, 서버가 다수일 경우 Redis를 이용한다.

또한, 별도로 DB 에 데이터를 저장해야 되는 부분이 반드시 생기므로, Entity, Service, Repository를 구현해야 한다. 

중요한 것은, WebServerHandler 구현체에서 바로 Repository를 주입받아 접근하지 않고, Service를 주입받아 활용한다.

 

여기서는 DB 에 저장 / 읽기 부분은 제외하고 정리하며, 대용량 및 빠른 경우를 상정하여 ProtoBuf를 이용하였다.

  • 실제로 ProtoBuf를 이용하는 경우는 대용량, 초고속일 경우를 제외하고는 protobuf 버전 관리, 클라이언트 개발 난이도, 디버깅 등 여러 이유로 잘 쓰지 않는게 좋다

 

 

WebSocket

WebSocket 은 양방향 실시간 통신으로서, 지속적인 연결이 필요할 경우 사용한다.규모가 커질 경우 외부 브로커를 별도로 두어 같이 운용하는경우가 대부분이다.STOMP는 개발을 빨리 해야 할 때, 메

dean83.tistory.com

 

Build.gradle 추가

implementation 'org.springframework.boot:spring-boot-starter-websocket'

 

데이터 규격 정의 예 (Protobuf 기준)

syntax = "proto3";

option java_package = "com.example.ws.proto";
option java_outer_classname = "ChatProto";

message ChatMessage {

  enum Type {
    CHAT = 0;
    DIRECT = 1;
    HEARTBEAT = 2;
    HEARTBEAT_ACK = 3;
  }

  Type type = 1;
  int64 senderUserId = 2;
  int64 targetUserId = 3;
  string content = 4;
  int64 timestamp = 5;
}
  • 이 스키마 내용을 chat.proto 파일에 저장하고 빌드 하면 ChatProto.java 파일이 생성 된다.
    • 파일위치는 src -> main 에 적당한 패키지를 생성하여 저장한다
    • 빌드는 gradle build를 말한다. 
    • 해당 파일 안에 ChatMessage 클래스가 존재한다.

 

  • 실제 ProtoBuf를 구성할 때에는 Envelop 으로 감싸고 그 안에 실제 메세지들을 별개로 구성하는게 좋다. 아래는 그 예이다.
  • 여기서는 이 방식을 따르지 않고 위의 예를 따랐다.
syntax = "proto3";

message Envelope {
  string version = 1;

  oneof body {
    ChatMessage chat = 2;
    PingMessage ping = 3;
    JoinMessage join = 4;
  }
}

혹은, 
syntax = "proto3";

message Envelope {
  string version = 1;

  anyof body {
    ChatMessage chat = 2;
    PingMessage ping = 3;
    JoinMessage join = 4;
  }
}



Protobuf Build.gradle 설정 

plugins {
    id 'com.google.protobuf' version '0.9.4'
}

dependencies {
    implementation 'com.google.protobuf:protobuf-java:3.25.3'
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.3"
    }
    generateProtoTasks {
        all().each { task ->
            task.builtins {
                java {}
            }
        }
    }
}

 

HandShakeInterceptor (JWT 인증) 예

  • WebSocket을 연결 하기 전, 클라이언트 인증을 수행하고, 해당 유저 정보를 attributes에 저장하여 추후 메세지 수신시 가져올 수 있게 한다. 
  • 아래 클라이언트 토큰 인증방식에 따라 코드가 변경된다. (현재 예는 앱 - 서버, 서버 - 서버 간 예)
  • 특히 STOMP 를 이용하여 Channel Interceptor 가 인증을 담당할 경우, beforeHandshake에서는 true를 바로 리턴한다.
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(
            ServerHttpRequest request,
            ServerHttpResponse response,
            WebSocketHandler wsHandler,
            Map<String, Object> attributes) {

        String token = extractToken(request);
        UserPrincipal user = authenticate(token);

        attributes.put("userId", user.getUserId());
        attributes.put("role", user.getRole());

        return true;
    }

    @Override
    public void afterHandshake(
            ServerHttpRequest request,
            ServerHttpResponse response,
            WebSocketHandler wsHandler,
            Exception exception) {
    }
}

 

클라이언트 토큰 인증 방식

  • 클라이언트가 무엇이냐, 어떻게 구현되었느냐에 따라 차이를 보인다. 
  • 모바일, 서버 - 서버 등 브라우저가 아닌 환경에서는 위에 처럼 일반 HTTP 통신에서 토큰 검증하듯 진행하면 된다.
  • 브라우저가 클라이언트인 경우 저 방식이 동작하지 않고, 다른 3방식 중 하나를 프론트엔드와 협의 후 정해야 한다. 
    • 이유 : 브라우저에서 websocket 통신히 헤더를 마음대로 제어하지 못함. 
    • 첫번째 방식 : Sec-WebSocket-Protocol 에 토큰을 넣는 방식
      • 원래는 WebSocket의 사용 프로토콜을 명시하는 곳인데, 여기에 토큰을 넣어도 무방 하다고 한다.
      • request.getHeaders().getFirst("Sec-WebSocket-Protocol") 을 통해 접근 후 적절히 파싱 필요
        • 다른 정보도 포함되어 있으므로 토큰을 검출하여 파싱 해야함.
      • DefaultHandshakeHandler 를 재정의 하는 클래스를 구현해야 함 (필수)
      • 구체적인 방식은 추후 찾아보는걸로..
    • 두번째 방식 : STOMP를 이용할 경우 Channel Interceptor를 이용하여 인증
      • WebSocket 연결 미리 허용 후 처리
      • 인증 후StompHeaderAccessor 에 인증정보 저장
    • 세번째 방식 : 쿼리 파라메터로 토큰을 같이 넘겨 주기
      • 토큰이 외부로 노출될 수 있어 비추천. 
      • 이 경우 아래와 같이 파싱하여 가져올 수 있다.
//STOMP 를 이용할 경우

@Component
@RequiredArgsConstructor
public class AuthChannelInterceptor implements ChannelInterceptor {

    private final JwtProvider jwtProvider;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {

        StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(
                        message,
                        StompHeaderAccessor.class
                );

        if (accessor == null) {
            return message;
        }

        // CONNECT 프레임에서만 인증
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {

            String authHeader =
                accessor.getFirstNativeHeader("Authorization");

            if (authHeader == null || !authHeader.startsWith("Bearer ")) {
                throw new IllegalArgumentException("Missing Authorization");
            }

            String token = authHeader.substring(7);

            UserPrincipal user = jwtProvider.authenticate(token);

            // ★ 여기서 Principal 설정
            accessor.setUser(
                new UsernamePasswordAuthenticationToken(
                    user,
                    null,
                    user.getAuthorities()
                )
            );
        }

        return message;
    }
}
//파라메터로 토큰을 넘겨주었을때 파싱 방법 (비추천)

private String extractToken(ServerHttpRequest request) {
        URI uri = request.getURI();

        MultiValueMap<String, String> params =
            UriComponentsBuilder.fromUri(uri)
                .build()
                .getQueryParams();

        return params.getFirst("token");
    }

 

WebSocket Config 설정

  • 구현한 HandShakeInterceptor, WebSocketHandler를 등록하고, Origin 설정을 한다. 
  • 만일 STOMP를 쓰거나 구형브라우저, 혹은 공공기관 등에서 WebSocket이 안될경우, setAllowedOrigins("*") 다음에 .WithSockJS()를 쓰자.
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {

    private final AuthHandshakeInterceptor handshakeInterceptor;
    private final ChatWebSocketHandler chatWebSocketHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatWebSocketHandler, "/ws/chat")
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins("*");
    }
}

 

WebSocketHandler 구현 예 

  • 채팅기반은 보통 text를 활용하고, 실시간, 고 가용성을 위해서는 binary를 많이 이용한다.
  • WebSocketHandler 는 Bean 으로서, 여러 요청 처리시 동시에 수행될 수 있으므로 동시성 문제가 생길 수 있다.
  • 주기적으로 hearbeat을 요청하여 응답이 없는 경우 해당 세션을 삭제한다.
  • 특히 채팅 메세지가 많은 등 대규모일 경우, 클래스 변수 -> byte[] 로 serialize 하여 구현하는게 맞다. 
    • 다만 이 경우, 클라이언트 측에서도 개발 난이도가 같이 증가 한다. 
    • ProtoBuf 를 이용한다. (구글 바이너리 직렬화)
@Component
@Slf4j
public class ChatWebSocketHandler extends BinaryWebSocketHandler {

    // 세션 인덱스
    private final ConcurrentMap<String, WebSocketSession> sessions =
            new ConcurrentHashMap<>();

    // 유저 → 세션들
    private final ConcurrentMap<Long, Set<WebSocketSession>> userSessions =
            new ConcurrentHashMap<>();

    // 세션별 send 락
    private final ConcurrentMap<String, Object> sessionLocks =
            new ConcurrentHashMap<>();
            
   private final ConcurrentMap<String, Long> lastHeartbeat =
            new ConcurrentHashMap<>();

    /* ==========================
       연결 수립
       ========================== */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {

        Long userId = (Long) session.getAttributes().get("userId");

        sessions.put(session.getId(), session);
        sessionLocks.put(session.getId(), new Object());
        lastHeartbeat.put(session.getId(), System.currentTimeMillis());

        userSessions
            .computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet())
            .add(session);

        log.info("WS connected. session={}, user={}", session.getId(), userId);
    }

    /* ==========================
       메시지 수신 (Binary)
       ========================== */
    @Override
    protected void handleBinaryMessage(
            WebSocketSession session,
            BinaryMessage message) throws Exception {

        ChatMessage msg = ChatMessage.parseFrom(message.getPayload().array());

        Long senderId = (Long) session.getAttributes().get("userId");

        switch (msg.getType()) {
            case HEARTBEAT_ACK -> {
                lastHeartbeat.put(session.getId(), System.currentTimeMillis());
            }
            case CHAT -> broadcast(msg, senderId);
            case DIRECT -> sendToUser(msg.getTargetUserId(), msg);
            default -> log.warn("Unknown type");
        }
    }

    /* ==========================
       메시지 전송 로직
       ========================== */
    private void broadcast(ChatMessage msg, Long senderId) {
        byte[] bytes = msg.toByteArray();
        BinaryMessage message = new BinaryMessage(bytes);

        sessions.values().forEach(session -> safeSend(session, message));
    }

    private void sendToUser(Long userId, ChatMessage msg) {
        Set<WebSocketSession> targets = userSessions.get(userId);
        if (targets == null) return;

        BinaryMessage message = new BinaryMessage(msg.toByteArray());
        targets.forEach(session -> safeSend(session, message));
    }

    /* ==========================
       thread-safe send
       ========================== */
    private void safeSend(WebSocketSession session, BinaryMessage message) {
        Object lock = sessionLocks.get(session.getId());
        if (lock == null) return;

        synchronized (lock) {
            try {
                if (session.isOpen()) {
                    session.sendMessage(message);
                }
            } catch (IOException e) {
                log.error("Send failed", e);
            }
        }
    }

    /* ==========================
       오류 처리
       ========================== */
    @Override
    public void handleTransportError(
            WebSocketSession session,
            Throwable exception) {

        log.error("WS error session={}", session.getId(), exception);
        closeSession(session);
    }

    /* ==========================
       연결 종료
       ========================== */
    @Override
    public void afterConnectionClosed(
            WebSocketSession session,
            CloseStatus status) {

        closeSession(session);
        log.info("WS closed session={}, status={}", session.getId(), status);
    }

    /* ==========================
       공통 정리 로직
       ========================== */
    private void closeSession(WebSocketSession session) {

        sessions.remove(session.getId());
        sessionLocks.remove(session.getId());

        Long userId = (Long) session.getAttributes().get("userId");
        if (userId != null) {
            Set<WebSocketSession> set = userSessions.get(userId);
            if (set != null) {
                set.remove(session);
                if (set.isEmpty()) {
                    userSessions.remove(userId);
                }
            }
        }

        try {
            if (session.isOpen()) {
                session.close();
            }
        } catch (IOException ignored) {
        }
    }
    
    @Scheduled(fixedDelay = 15_000)
    public void heartbeatAndCleanup() {

        long now = System.currentTimeMillis();
        long timeout = 45_000;

        ChatProto.ChatMessage heartbeat =
            ChatProto.ChatMessage.newBuilder()
                .setType(ChatProto.ChatMessage.Type.HEARTBEAT)
                .setTimestamp(now)
                .build();

        BinaryMessage message =
            new BinaryMessage(heartbeat.toByteArray());

        sessions.values().forEach(session -> {

            Long last = lastHeartbeat.get(session.getId());

            if (last == null || now - last > timeout) {
                log.warn("Zombie session detected: {}", session.getId());
                closeSession(session);
                return;
            }

            safeSend(session, message);
        });
    }
    
    private void closeSession(WebSocketSession session) {

    sessions.remove(session.getId());
    sessionLocks.remove(session.getId());
    lastHeartbeat.remove(session.getId());

    Long userId = (Long) session.getAttributes().get("userId");
    if (userId != null) {
        Set<WebSocketSession> set = userSessions.get(userId);
        if (set != null) {
            set.remove(session);
            if (set.isEmpty()) {
                userSessions.remove(userId);
            }
        }
    }

    try {
        if (session.isOpen()) {
            session.close();
        }
    } catch (IOException ignored) {}
}
}
  • WebSocketSession 을 이용하여 send를 하거나, getAttributes().get 을 통해 인증된 사용자 정보를 가져온다.