Backend/SpringBoot

Kafka-Outbox 전략 구현 예

Dean83 2026. 1. 12. 18:54

이론적인 부분은 https://dean83.tistory.com/406 를 참조하면 된다. 

간략히 요약하면, Outbox 패턴은 DB + kafka 조합시 한쪽의 오류로 인한 정합성이 깨지는걸 방지한다. 

Outbox 테이블에 kafka로 보낼 메세지를 담아두고, 배치작업을 통해 db 내용을 불러와 kafka 메세지로 전달해준다. 

 

application.yaml 에서 프로듀서 설정 부분

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all
      retries: 5
      enable-idempotence: true
      properties:
        max.in.flight.requests.per.connection: 5

 

엔티티 구성 예

@Entity
@Table(name = "outbox")
@Getter
@NoArgsConstructor
public class Outbox {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;   // UUID

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private Long version;

    @Enumerated(EnumType.STRING)
    private Status status;

    @Lob
    private String payload;

	@Column(nullable = false)
    private int retryCount;

    private Instant lastFailedAt;

    private Instant createdAt;

    public enum Status {
        PENDING, SENT, FAILED
    }

    public void markSent() {
        this.status = Status.SENT;
    }

    public void markFailed() {
        this.status = Status.FAILED;
         this.retryCount++;
    	this.lastFailedAt = Instant.now();
    }
}

 

리파지토리 예

public interface OutboxRepository extends JpaRepository<Outbox, Long> {

    @Query(
      value = """
      SELECT *
      FROM outbox
      WHERE status = 'PENDING'
      ORDER BY id
      LIMIT :limit
      FOR UPDATE SKIP LOCKED
      """,
      nativeQuery = true
    )
    List<Outbox> findPendingForUpdate(@Param("limit") int limit);
    
    @Query("""
    SELECT o FROM Outbox o
    WHERE o.status = 'FAILED'
      AND o.retryCount < :maxRetry
    ORDER BY o.lastFailedAt
    """)
	List<Outbox> findRecoverableFailed(@Param("maxRetry") int maxRetry);
}

 

 

Outbox 퍼블리셔 ( kafka 메세지 전송)

@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPublisher {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void publish(Outbox outbox) {
        kafkaTemplate.send(
            "order-events",
            outbox.getAggregateId(),   // key → 순서 보장
            outbox.getPayload()
        );
    }
}

 

배치 작업 예

@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPoller {

    private final OutboxRepository outboxRepository;
    private final OutboxPublisher publisher;

    private static final int BATCH_SIZE = 100;

    @Transactional
    @Scheduled(fixedDelay = 1000)
    public void poll() {
        List<Outbox> batch =
            outboxRepository.findPendingForUpdate(BATCH_SIZE);

        if (batch.isEmpty()) {
            return;
        }

        for (Outbox outbox : batch) {
            try {
                publisher.publish(outbox);
                outbox.markSent();
            } catch (Exception e) {
                log.error("Publish failed. eventId={}", outbox.getEventId(), e);
                outbox.markFailed();
            }
        }    
        
        outboxRepository.saveAll(batch);
    }
}

 

실패 항목들 다시 재시도 하는 스케줄러 예

@Component
@RequiredArgsConstructor
@Slf4j
public class FailedOutboxRecoveryJob {

    private final OutboxRepository outboxRepository;

    private static final int MAX_RETRY = 5;

    @Transactional
    @Scheduled(fixedDelay = 60_000) // 1분
    public void recover() {

        List<Outbox> failedList =
            outboxRepository.findRecoverableFailed(MAX_RETRY);

        for (Outbox outbox : failedList) {
            log.info("Recover outbox eventId={}", outbox.getEventId());
            outbox.setStatus(Outbox.Status.PENDING);
        }
    }
}