이론적인 부분은 https://dean83.tistory.com/406 를 참조하면 된다.
간략히 요약하면, Outbox 패턴은 DB + kafka 조합시 한쪽의 오류로 인한 정합성이 깨지는걸 방지한다.
Outbox 테이블에 kafka로 보낼 메세지를 담아두고, 배치작업을 통해 db 내용을 불러와 kafka 메세지로 전달해준다.
application.yaml 에서 프로듀서 설정 부분
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all
retries: 5
enable-idempotence: true
properties:
max.in.flight.requests.per.connection: 5
엔티티 구성 예
@Entity
@Table(name = "outbox")
@Getter
@NoArgsConstructor
public class Outbox {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId; // UUID
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private Long version;
@Enumerated(EnumType.STRING)
private Status status;
@Lob
private String payload;
@Column(nullable = false)
private int retryCount;
private Instant lastFailedAt;
private Instant createdAt;
public enum Status {
PENDING, SENT, FAILED
}
public void markSent() {
this.status = Status.SENT;
}
public void markFailed() {
this.status = Status.FAILED;
this.retryCount++;
this.lastFailedAt = Instant.now();
}
}
리파지토리 예
public interface OutboxRepository extends JpaRepository<Outbox, Long> {
@Query(
value = """
SELECT *
FROM outbox
WHERE status = 'PENDING'
ORDER BY id
LIMIT :limit
FOR UPDATE SKIP LOCKED
""",
nativeQuery = true
)
List<Outbox> findPendingForUpdate(@Param("limit") int limit);
@Query("""
SELECT o FROM Outbox o
WHERE o.status = 'FAILED'
AND o.retryCount < :maxRetry
ORDER BY o.lastFailedAt
""")
List<Outbox> findRecoverableFailed(@Param("maxRetry") int maxRetry);
}
Outbox 퍼블리셔 ( kafka 메세지 전송)
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
public void publish(Outbox outbox) {
kafkaTemplate.send(
"order-events",
outbox.getAggregateId(), // key → 순서 보장
outbox.getPayload()
);
}
}
배치 작업 예
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPoller {
private final OutboxRepository outboxRepository;
private final OutboxPublisher publisher;
private static final int BATCH_SIZE = 100;
@Transactional
@Scheduled(fixedDelay = 1000)
public void poll() {
List<Outbox> batch =
outboxRepository.findPendingForUpdate(BATCH_SIZE);
if (batch.isEmpty()) {
return;
}
for (Outbox outbox : batch) {
try {
publisher.publish(outbox);
outbox.markSent();
} catch (Exception e) {
log.error("Publish failed. eventId={}", outbox.getEventId(), e);
outbox.markFailed();
}
}
outboxRepository.saveAll(batch);
}
}
실패 항목들 다시 재시도 하는 스케줄러 예
@Component
@RequiredArgsConstructor
@Slf4j
public class FailedOutboxRecoveryJob {
private final OutboxRepository outboxRepository;
private static final int MAX_RETRY = 5;
@Transactional
@Scheduled(fixedDelay = 60_000) // 1분
public void recover() {
List<Outbox> failedList =
outboxRepository.findRecoverableFailed(MAX_RETRY);
for (Outbox outbox : failedList) {
log.info("Recover outbox eventId={}", outbox.getEventId());
outbox.setStatus(Outbox.Status.PENDING);
}
}
}'Backend > SpringBoot' 카테고리의 다른 글
| 순수 WebSocket (+Protobuf) (0) | 2026.01.14 |
|---|---|
| Outbox 패턴 모니터링 설정 (Prometheus+Grafana 를 위한) (1) | 2026.01.12 |
| Kafka (+Kafka UI) 설정 및 연동 (오류 핸들링 포함) (0) | 2026.01.12 |
| Prometheus + Grafana 를 이용한 모니터링 (0) | 2026.01.05 |
| Cache 를 Actuator에서 확인하기 (Caffein 예) (0) | 2026.01.05 |