이론부분은 https://dean83.tistory.com/412 이부분을 보면 된다.
별도로 build.gradle에 추가할 필요는 없다. (대부분 starter-web은 이미 추가 되어 있다)
Async 및 TaskExecutor 설정
- 이 부분은 https://dean83.tistory.com/393 에서 자세히 다루었다.
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig {
@Bean("sseExecutor")
public TaskExecutor sseTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("sse-");
executor.initialize();
return executor;
}
}
전송 데이터 정의 예
- SSE는 text 기반이므로, 직렬화가 가능해야 한다.
@Getter
@AllArgsConstructor
public class NotificationEvent {
private String id;
private String type;
private String message;
}
SSE 이벤트 데이터 엔티티 예 (DB 저장용)
- 서버가 재부팅 되거나 클라이언트 재접속 등 이슈가 발생했을때 이어서 동작할 수 있도록 DB에 내용 저장
@Entity
@Table(name = "sse_event")
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SseEventEntity {
@Id
private String eventId;
private String eventType;
private String targetGroup;
@Lob
private String payload;
private LocalDateTime createdAt;
}
이벤트 저장 및 조회 서비스 예
@Service
@RequiredArgsConstructor
public class EventStoreService {
private final SseEventRepository repository;
private final ObjectMapper objectMapper = new ObjectMapper();
public void save(String eventId, String eventType, String group, Object payload) {
try {
repository.save(
SseEventEntity.builder()
.eventId(eventId)
.eventType(eventType)
.targetGroup(group)
.payload(objectMapper.writeValueAsString(payload))
.createdAt(LocalDateTime.now())
.build()
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public List<SseEventEntity> findAfter(String lastEventId, String group) {
return repository.findByEventIdGreaterThanAndTargetGroup(
lastEventId, group
);
}
}
클라이언트 정보 저장 클래스 예
- SseEmitter 를 관리하는 측에서 이 정보를 토대로 생성/삭제 등 관리
@Getter
@AllArgsConstructor
public class ClientSubscription {
private String clientId;
private String group;
private Set<String> eventTypes;
private SseEmitter emitter;
}
SseEmitter 관리
- Sse 생성 및 관리 코드 예제이다.
@Component
@RequiredArgsConstructor
public class SseEmitterManager {
private static final long TIMEOUT = 60 * 60 * 1000;
private static final int MAX_CONNECTION = 1000;
// clientId 기준으로 관리 (정책적으로 단일 연결)
private final Map<String, ClientSubscription> clients = new ConcurrentHashMap<>();
private final SseReplayService replayService;
public SseEmitter create(
String clientId,
String group,
Set<String> eventTypes,
String lastEventId
) {
if (clients.size() >= MAX_CONNECTION) {
throw new IllegalStateException("Max SSE connections exceeded");
}
// 기존 연결 정리 (중복 로그인 / 재연결 대비)
remove(clientId);
SseEmitter emitter = new SseEmitter(TIMEOUT);
ClientSubscription subscription =
new ClientSubscription(clientId, group, eventTypes, emitter);
clients.put(clientId, subscription);
emitter.onCompletion(() -> remove(clientId));
emitter.onTimeout(() -> remove(clientId));
emitter.onError(e -> remove(clientId));
//핵심: emitter 생성 직후 재전송 트리거
replayService.replay(subscription, lastEventId);
return emitter;
}
public Collection<ClientSubscription> getAll() {
return clients.values();
}
public Optional<ClientSubscription> get(String clientId) {
return Optional.ofNullable(clients.get(clientId));
}
public void remove(String clientId) {
ClientSubscription client = clients.remove(clientId);
if (client != null) {
client.getEmitter().complete();
}
}
}
비동기 전송 및 필터링 서비스
- 구독한 항목 혹은 유저 그룹별 필터링 하여 전송 대상을 선정, 비동기로 전송
@Service
@RequiredArgsConstructor
public class SseSendService {
private final SseEmitterManager manager;
private final EventStoreService storeService;
@Async("sseExecutor")
public void publish(
String eventType,
String targetGroup,
Object payload
) {
String eventId = UUID.randomUUID().toString();
// 1️⃣ DB 저장
storeService.save(eventId, eventType, targetGroup, payload);
// 2️⃣ 필터링 후 전송
manager.getAll().stream()
.filter(c -> c.getGroup().equals(targetGroup))
.filter(c -> c.getEventTypes().contains(eventType))
.forEach(c -> send(c, eventId, eventType, payload));
}
private void send(
ClientSubscription client,
String eventId,
String eventType,
Object payload
) {
try {
client.getEmitter().send(
SseEmitter.event()
.id(eventId)
.name(eventType)
.data(payload)
);
} catch (IOException e) {
client.getEmitter().complete();
}
}
}
주기적으로 연결 정리
- 스케줄러를 통해 heartbeat을 보내고 정리한다.
@Component
@RequiredArgsConstructor
public class SseHeartbeatScheduler {
private final SseEmitterManager manager;
@Scheduled(fixedRate = 30_000)
public void heartbeat() {
manager.getAll().forEach(c -> {
try {
c.getEmitter().send(
SseEmitter.event()
.name("heartbeat")
.data("ping")
);
} catch (IOException e) {
c.getEmitter().complete();
}
});
}
}
재전송 서비스 예
@Service
@RequiredArgsConstructor
public class SseReplayService {
private final EventStoreService storeService;
@Async("sseExecutor")
public void replay(
ClientSubscription client,
String lastEventId
) {
if (lastEventId == null) return;
List<SseEventEntity> events =
storeService.findAfter(lastEventId, client.getGroup());
events.stream()
.filter(e -> client.getEventTypes().contains(e.getEventType()))
.forEach(e -> send(client, e));
}
private void send(ClientSubscription client, SseEventEntity e) {
try {
client.getEmitter().send(
SseEmitter.event()
.id(e.getEventId())
.name(e.getEventType())
.data(e.getPayload())
);
} catch (IOException ex) {
client.getEmitter().complete();
}
}
}
컨트롤러 예
@RestController
@RequestMapping("/api/sse")
public class SseController {
private final SseEmitterManager sseEmitterManager;
public SseController(SseEmitterManager sseEmitterManager) {
this.sseEmitterManager = sseEmitterManager;
}
@GetMapping("/subscribe")
public SseEmitter subscribe(
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
@RequestParam String userId
) {
return sseEmitterManager.subscribe(userId, lastEventId);
}
}'Backend > SpringBoot' 카테고리의 다른 글
| Security 에서 Role 적용시 헷갈리는 부분 정리 (0) | 2026.01.28 |
|---|---|
| 순수 WebSocket (+Protobuf) (0) | 2026.01.14 |
| Outbox 패턴 모니터링 설정 (Prometheus+Grafana 를 위한) (1) | 2026.01.12 |
| Kafka-Outbox 전략 구현 예 (0) | 2026.01.12 |
| Kafka (+Kafka UI) 설정 및 연동 (오류 핸들링 포함) (0) | 2026.01.12 |