Backend/SpringBoot

Kafka (+Kafka UI) 설정 및 연동 (오류 핸들링 포함)

Dean83 2026. 1. 12. 12:37

이론적인 내용은 https://dean83.tistory.com/406 여기를 참조. 이 글에서는 기본적으로 로컬PC 기준으로 설명한다.

기동 순서는 Kafka 실행 -> 토픽 생성 -> 서버실행 순이다. 

  • Kafka 설정에서 topic 자동생성을 했다면 상관없지만, 실서버에서는 수동생성을 하므로 순서를 지켜야 한다.
  • 서버 실행시 topic이 없다면 실행이 안되므로, CI/CD 혹은 커맨드를 통해 미리 생성해야 한다.

 

Kafka 및 KafkaUI docker-compose 설정 예시 

version: '3.8'

services:
  kafka:
    image: bitnami/kafka:3.7
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      # KRaft 모드 설정 (Zookeeper 미사용)
      KAFKA_ENABLE_KRAFT: "yes"
      KAFKA_CFG_PROCESS_ROLES: "broker,controller"
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"

      # 리스너 설정
      KAFKA_CFG_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://kafka:29093
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT

      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER

      # 토픽 관련 기본 옵션
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
      //KAFKA_CFG_NUM_PARTITIONS: 3  토픽 생성 자동일때 사용
      //KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1 토픽 생성 자동일때 사용
      
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

      # 로그 설정
      KAFKA_CFG_LOG_RETENTION_HOURS: 168
      KAFKA_CFG_LOG_SEGMENT_BYTES: 1073741824

      ALLOW_PLAINTEXT_LISTENER: "yes"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local-kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
  • KAFKA_ENABLE_KRAFT : zookeeper 없이 kafka 자체 메타데이터 관리 (토픽, 파티션 리더 선출 등)
    • 컨트롤러 브로커, 즉 kafka 서버들을 관리하는 주체에 대한 부분으로, kraft로 대체되었다.
  • PROCESS_ROLES : broker와 controller 역할 부여
    • 모두 컨트롤러 역할까지 주게되면 메타데이터 처리로 무거워 진다. 따라서 다수의 kafka를 띄울경우 컨트롤러, 브로커 역할 관리를 잘 나누어야 한다.
  • 리스너 설정
    • 내, 외부 접속 리스너 설정별로 따로 관리하는게 좋음. 
    • 특히 kafka 도커와 kafka-ui 도커가 분리되어 있을 경우에는 접속이 까다로워 두개를 compose로 묶어 설정함.
  • KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE : 자동으로 토픽 생성 여부 결정. 테스트용으로는 true로 해도 되지만 실제 배포시에는 false로 두는것을 권장
    • 오타로 인해 잘못된 토픽 생성이 될 수 있음. 
    • 토픽별 세부 설정 불가능. 

 

Kafka 토픽 설정

  • kafka 접속
docker exec -it kafka bash

 

  • 토픽 생성
kafka-topics.sh --create \
  --topic order.created \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=delete \
  --config retention.ms=604800000

 

혹은, kafka-ui 에서 생성 가능하다.

  • 접속 후, Topics -> Create Topic
  • 설정 입력 후 생성

 

혹은, Springboot 코드로도 생성 가능하다. 

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic orderCreatedTopic() {
        return TopicBuilder.name("order.created")
                .partitions(6)
                .replicas(1)
                .build();
    }
}
  • KafkaAdmin이 스프링부트 구동시 자동으로 모든 NewTopic 리턴형을 보고 등록한다.
  • 단, 이 경우 테스트용으로는 괜찮으나 실서버에서는 kafka 가 동작 안할때 서버도 같이 죽어버릴 가능성이 있고, 여러 서버 구동시 문제가 될 수 있으므로 피하는게 좋다. 
  • kafka 설정을 스프링부트에서 하는게 맞지 않다고 생각하기에 테스트용, 로컬용으로만 쓰는게 좋다.

 

SpringBoot  설정

Gradle 추가

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
}

 

application.yaml 추가

spring:
  kafka:
    bootstrap-servers: localhost:9092

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
      	enable.idempotence: true

    consumer:
      group-id: order-service
      auto-offset-reset: earliest
      max-poll-records: 10
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        max.poll.interval.ms: 600000
        session.timeout.ms: 15000
        heartbeat.interval.ms: 5000

    listener:
      ack-mode: manual
  • producer 설정
    • acks=all : 모든 브로커 확인하여 메세지가 전송이 잘 되었는지 확인하는 옵션
      • 확인을 가능하게 하는 옵션이고, 코드에서 callback 구현 해야 한다.
    • retries : 전송 실패 시 재시도 횟수
      • 네트워크 장애, 브로커 응답없음 등일경우 재시도 한다.
      • 그러나 원론적인 오류들 (예 : serialization 실패, 토픽 없음 등) 은 재시도 하지 않는다.
  • consumer 설정
    • group-id : 컨슈머 그룹
    • auto-offset-reset=earliest : 처음 실행시 처음부터 읽음
    • max-poll-records : 한번 poll로 가져오는 최대 메세지 개수
      • 기본값은 500인데 이대로 쓰지 않는다.
      • 일반 API 이벤트 : 10 - 50
      • 무거운 로직 : 1 - 5
      • 금융/정산 : 1 - 10
      • 로그집계 : 100이상
    • session.timeout.ms
      • 이 시간동안 heartbeat이 안오면 컨슈머가 죽었다고 판단함
      • session timeout 값이 더 커야 한다.
    • heartbeat.interval.ms
      • 이 간격으로 살아있음을 브로커에 보냄
  • key, value serializer : Serializer 지정 
  • ack-mode : manual : 수동 커밋 지정 (메세지 처리 성공시에는 offset 커밋)
  • enable.idempotence : 같은 메세지를 여러번 보내도 브로커에는 한번만 저장하도록 하는 옵션
    • true로 설정하는것을 권장한다. 
    • 메세지 전송은 성공 했으나, ack 가 실패했을경우, 재전송 -> 브로커에 중복된 메세지가 쌓일 수 있음을 방지
    • retry와 ack:all 설정을 하도록 강제한다.

 

커스텀 이벤트 클래스 예

public record OrderCreatedEvent(
        Long orderId,
        String productName,
        int quantity
) {}

 

kafka로 전송하는 예 (producer)

@Component
@RequiredArgsConstructor
public class OrderProducer {

    private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

    public void send(OrderCreatedEvent event) {
        kafkaTemplate.send("order.created", event.orderId().toString(), event)
        .addCallback(
            result -> {
                // ack 성공 (브로커가 acks 조건 충족)
                System.out.println("Send success, offset="
                        + result.getRecordMetadata().offset());
            },
            ex -> {
                // ack 실패
                System.out.println("Send failed: " + ex.getMessage());
            }
        );;
    }
}
  • 이를 주입받아서 호출하여 전송
  • send 인자값 : 토픽명, 키값, value 값
    • 키값을 통해 어느 파티션에 저장될지 결정한다. 
    • 따라서 키값을 메세지들의 순서/처리 흐름을 묶는 기준으로 잘 설계해야 한다. 
  • 토픽명, 값 으로도 가능 -> 이 경우 키값이 없으므로 sticky partitioner로 보낸다. 
    • 특정 파티션 하나를 선택하여 사용한다.
    • 순서 상관없는 경우에만 사용한다. (로그, 알림 발송 등)
  • callback을 통해 전송 실패, 성공 유무를 확인 해야 한다.

 

kafka 로 부터 이벤트 수신하여 처리하는 예 (consumer)

@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "order.created")
    public void consume(
        OrderEvent event,
        Acknowledgment ack
    ) {
        // 비즈니스 처리
        process(event);

        // 성공 시에만 offset commit
        ack.acknowledge();
    }

    private void process(OrderEvent event) {
        if (event.amount() < 0) {
            throw new IllegalArgumentException("invalid amount");
        }

        // DB 저장, 외부 호출 등
    }
}
  • ack.acknoledge() 를 통해 offset commit을 하게 되는데, 이처럼 수동으로 하는것을 권장한다.
    • 개발자가 명시적으로 확인해 주는것이 좋다.
    • 따라서 enable-auto-commit 설정을 false로 해야 한다.

 

Consumer 오류 핸들링

  • 보통은 DefaultErrorHandler 를 통해 재시도, DLT로 전송의 과정을 수행한다.
  • Bean 으로 등록해두면, KafkaListener 어노테이션이 붙은 메서드에서 예외 발생시 자동 동작 한다.
  • listener 메소드 내부에서 오류를 삼키지 않아야 한다.
  • enable-auto-commit을 false로 둬야 한다.
  • ack-mode 를 MANUAL로 둬야 한다. (혹은 RECORD)
  • DLT에 메세지가 정상적으로 전송되면 자동으로 offset commit을 수행한다.
@Configuration
public class KafkaErrorHandlerConfig {

    @Bean
    public DefaultErrorHandler kafkaErrorHandler(
            KafkaTemplate<Object, Object> kafkaTemplate
    ) {
        // DLT로 보내는 역할
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(
                kafkaTemplate,
                (record, ex) ->
                    new TopicPartition(
                        record.topic() + ".DLT",
                        record.partition()
                    )
            );

        // retry 정책
        FixedBackOff backOff = new FixedBackOff(
            1000L, // 1초 간격
            3      // 최대 3회 retry
        );

        DefaultErrorHandler errorHandler =
            new DefaultErrorHandler(recoverer, backOff);

        // retry 하면 안 되는 예외 지정
        errorHandler.addNotRetryableExceptions(
            IllegalArgumentException.class
        );

        return errorHandler;
    }
}