Backend/SpringBoot

SSE 예제

Dean83 2026. 1. 14. 17:17

이론부분은 https://dean83.tistory.com/412 이부분을 보면 된다. 

별도로 build.gradle에 추가할 필요는 없다. (대부분 starter-web은 이미 추가 되어 있다)

 

Async 및 TaskExecutor 설정

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig {

    @Bean("sseExecutor")
    public TaskExecutor sseTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("sse-");
        executor.initialize();
        return executor;
    }
}

 

전송 데이터 정의 예

  • SSE는 text 기반이므로, 직렬화가 가능해야 한다.
@Getter
@AllArgsConstructor
public class NotificationEvent {
    private String id;
    private String type;
    private String message;
}

 

 

SSE 이벤트 데이터 엔티티 예 (DB 저장용)

  • 서버가 재부팅 되거나 클라이언트 재접속 등 이슈가 발생했을때 이어서 동작할 수 있도록 DB에 내용 저장
@Entity
@Table(name = "sse_event")
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SseEventEntity {

    @Id
    private String eventId;

    private String eventType;
    private String targetGroup;

    @Lob
    private String payload;

    private LocalDateTime createdAt;
}

 

 

이벤트 저장 및 조회 서비스 예

@Service
@RequiredArgsConstructor
public class EventStoreService {

    private final SseEventRepository repository;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public void save(String eventId, String eventType, String group, Object payload) {
        try {
            repository.save(
                SseEventEntity.builder()
                    .eventId(eventId)
                    .eventType(eventType)
                    .targetGroup(group)
                    .payload(objectMapper.writeValueAsString(payload))
                    .createdAt(LocalDateTime.now())
                    .build()
            );
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public List<SseEventEntity> findAfter(String lastEventId, String group) {
        return repository.findByEventIdGreaterThanAndTargetGroup(
            lastEventId, group
        );
    }
}

 

 

클라이언트 정보 저장 클래스 예

  • SseEmitter 를 관리하는 측에서 이 정보를 토대로 생성/삭제 등 관리
@Getter
@AllArgsConstructor
public class ClientSubscription {
    private String clientId;
    private String group;
    private Set<String> eventTypes;
    private SseEmitter emitter;
}

 

SseEmitter 관리

  • Sse 생성 및 관리 코드 예제이다.
@Component
@RequiredArgsConstructor
public class SseEmitterManager {

    private static final long TIMEOUT = 60 * 60 * 1000;
    private static final int MAX_CONNECTION = 1000;

    // clientId 기준으로 관리 (정책적으로 단일 연결)
    private final Map<String, ClientSubscription> clients = new ConcurrentHashMap<>();

    private final SseReplayService replayService;

    public SseEmitter create(
        String clientId,
        String group,
        Set<String> eventTypes,
        String lastEventId
    ) {
        if (clients.size() >= MAX_CONNECTION) {
            throw new IllegalStateException("Max SSE connections exceeded");
        }

        // 기존 연결 정리 (중복 로그인 / 재연결 대비)
        remove(clientId);

        SseEmitter emitter = new SseEmitter(TIMEOUT);

        ClientSubscription subscription =
            new ClientSubscription(clientId, group, eventTypes, emitter);

        clients.put(clientId, subscription);

        emitter.onCompletion(() -> remove(clientId));
        emitter.onTimeout(() -> remove(clientId));
        emitter.onError(e -> remove(clientId));

        //핵심: emitter 생성 직후 재전송 트리거
        replayService.replay(subscription, lastEventId);

        return emitter;
    }

    public Collection<ClientSubscription> getAll() {
        return clients.values();
    }

    public Optional<ClientSubscription> get(String clientId) {
        return Optional.ofNullable(clients.get(clientId));
    }

    public void remove(String clientId) {
        ClientSubscription client = clients.remove(clientId);
        if (client != null) {
            client.getEmitter().complete();
        }
    }
}

 

 

비동기 전송 및 필터링 서비스

  • 구독한 항목 혹은 유저 그룹별 필터링 하여 전송 대상을 선정, 비동기로 전송
@Service
@RequiredArgsConstructor
public class SseSendService {

    private final SseEmitterManager manager;
    private final EventStoreService storeService;

    @Async("sseExecutor")
    public void publish(
        String eventType,
        String targetGroup,
        Object payload
    ) {
        String eventId = UUID.randomUUID().toString();

        // 1️⃣ DB 저장
        storeService.save(eventId, eventType, targetGroup, payload);

        // 2️⃣ 필터링 후 전송
        manager.getAll().stream()
            .filter(c -> c.getGroup().equals(targetGroup))
            .filter(c -> c.getEventTypes().contains(eventType))
            .forEach(c -> send(c, eventId, eventType, payload));
    }

    private void send(
        ClientSubscription client,
        String eventId,
        String eventType,
        Object payload
    ) {
        try {
            client.getEmitter().send(
                SseEmitter.event()
                    .id(eventId)
                    .name(eventType)
                    .data(payload)
            );
        } catch (IOException e) {
            client.getEmitter().complete();
        }
    }
}

 

주기적으로 연결 정리

  • 스케줄러를 통해 heartbeat을 보내고 정리한다.
@Component
@RequiredArgsConstructor
public class SseHeartbeatScheduler {

    private final SseEmitterManager manager;

    @Scheduled(fixedRate = 30_000)
    public void heartbeat() {
        manager.getAll().forEach(c -> {
            try {
                c.getEmitter().send(
                    SseEmitter.event()
                        .name("heartbeat")
                        .data("ping")
                );
            } catch (IOException e) {
                c.getEmitter().complete();
            }
        });
    }
}

 

재전송 서비스 예

@Service
@RequiredArgsConstructor
public class SseReplayService {

    private final EventStoreService storeService;

    @Async("sseExecutor")
    public void replay(
        ClientSubscription client,
        String lastEventId
    ) {
        if (lastEventId == null) return;

        List<SseEventEntity> events =
            storeService.findAfter(lastEventId, client.getGroup());

        events.stream()
            .filter(e -> client.getEventTypes().contains(e.getEventType()))
            .forEach(e -> send(client, e));
    }

    private void send(ClientSubscription client, SseEventEntity e) {
        try {
            client.getEmitter().send(
                SseEmitter.event()
                    .id(e.getEventId())
                    .name(e.getEventType())
                    .data(e.getPayload())
            );
        } catch (IOException ex) {
            client.getEmitter().complete();
        }
    }
}

 

 

컨트롤러 예

@RestController
@RequestMapping("/api/sse")
public class SseController {

    private final SseEmitterManager sseEmitterManager;

    public SseController(SseEmitterManager sseEmitterManager) {
        this.sseEmitterManager = sseEmitterManager;
    }

    @GetMapping("/subscribe")
    public SseEmitter subscribe(
            @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
            @RequestParam String userId
    ) {
        return sseEmitterManager.subscribe(userId, lastEventId);
    }
}