이론적인 내용은 https://dean83.tistory.com/406 여기를 참조. 이 글에서는 기본적으로 로컬PC 기준으로 설명한다.
기동 순서는 Kafka 실행 -> 토픽 생성 -> 서버실행 순이다.
- Kafka 설정에서 topic 자동생성을 했다면 상관없지만, 실서버에서는 수동생성을 하므로 순서를 지켜야 한다.
- 서버 실행시 topic이 없다면 실행이 안되므로, CI/CD 혹은 커맨드를 통해 미리 생성해야 한다.
Kafka 및 KafkaUI docker-compose 설정 예시
version: '3.8'
services:
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
environment:
# KRaft 모드 설정 (Zookeeper 미사용)
KAFKA_ENABLE_KRAFT: "yes"
KAFKA_CFG_PROCESS_ROLES: "broker,controller"
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
# 리스너 설정
KAFKA_CFG_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://kafka:29093
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
# 토픽 관련 기본 옵션
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
//KAFKA_CFG_NUM_PARTITIONS: 3 토픽 생성 자동일때 사용
//KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1 토픽 생성 자동일때 사용
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
# 로그 설정
KAFKA_CFG_LOG_RETENTION_HOURS: 168
KAFKA_CFG_LOG_SEGMENT_BYTES: 1073741824
ALLOW_PLAINTEXT_LISTENER: "yes"
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-kafka
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
- KAFKA_ENABLE_KRAFT : zookeeper 없이 kafka 자체 메타데이터 관리 (토픽, 파티션 리더 선출 등)
- 컨트롤러 브로커, 즉 kafka 서버들을 관리하는 주체에 대한 부분으로, kraft로 대체되었다.
- PROCESS_ROLES : broker와 controller 역할 부여
- 모두 컨트롤러 역할까지 주게되면 메타데이터 처리로 무거워 진다. 따라서 다수의 kafka를 띄울경우 컨트롤러, 브로커 역할 관리를 잘 나누어야 한다.
- 리스너 설정
- 내, 외부 접속 리스너 설정별로 따로 관리하는게 좋음.
- 특히 kafka 도커와 kafka-ui 도커가 분리되어 있을 경우에는 접속이 까다로워 두개를 compose로 묶어 설정함.
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE : 자동으로 토픽 생성 여부 결정. 테스트용으로는 true로 해도 되지만 실제 배포시에는 false로 두는것을 권장
- 오타로 인해 잘못된 토픽 생성이 될 수 있음.
- 토픽별 세부 설정 불가능.
Kafka 토픽 설정
- kafka 접속
docker exec -it kafka bash
- 토픽 생성
kafka-topics.sh --create \
--topic order.created \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=delete \
--config retention.ms=604800000
혹은, kafka-ui 에서 생성 가능하다.
- 접속 후, Topics -> Create Topic
- 설정 입력 후 생성
혹은, Springboot 코드로도 생성 가능하다.
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic orderCreatedTopic() {
return TopicBuilder.name("order.created")
.partitions(6)
.replicas(1)
.build();
}
}
- KafkaAdmin이 스프링부트 구동시 자동으로 모든 NewTopic 리턴형을 보고 등록한다.
- 단, 이 경우 테스트용으로는 괜찮으나 실서버에서는 kafka 가 동작 안할때 서버도 같이 죽어버릴 가능성이 있고, 여러 서버 구동시 문제가 될 수 있으므로 피하는게 좋다.
- kafka 설정을 스프링부트에서 하는게 맞지 않다고 생각하기에 테스트용, 로컬용으로만 쓰는게 좋다.
SpringBoot 설정
Gradle 추가
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
}
application.yaml 추가
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
consumer:
group-id: order-service
auto-offset-reset: earliest
max-poll-records: 10
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
max.poll.interval.ms: 600000
session.timeout.ms: 15000
heartbeat.interval.ms: 5000
listener:
ack-mode: manual
- producer 설정
- acks=all : 모든 브로커 확인하여 메세지가 전송이 잘 되었는지 확인하는 옵션
- 확인을 가능하게 하는 옵션이고, 코드에서 callback 구현 해야 한다.
- retries : 전송 실패 시 재시도 횟수
- 네트워크 장애, 브로커 응답없음 등일경우 재시도 한다.
- 그러나 원론적인 오류들 (예 : serialization 실패, 토픽 없음 등) 은 재시도 하지 않는다.
- acks=all : 모든 브로커 확인하여 메세지가 전송이 잘 되었는지 확인하는 옵션
- consumer 설정
- group-id : 컨슈머 그룹
- auto-offset-reset=earliest : 처음 실행시 처음부터 읽음
- max-poll-records : 한번 poll로 가져오는 최대 메세지 개수
- 기본값은 500인데 이대로 쓰지 않는다.
- 일반 API 이벤트 : 10 - 50
- 무거운 로직 : 1 - 5
- 금융/정산 : 1 - 10
- 로그집계 : 100이상
- session.timeout.ms
- 이 시간동안 heartbeat이 안오면 컨슈머가 죽었다고 판단함
- session timeout 값이 더 커야 한다.
- heartbeat.interval.ms
- 이 간격으로 살아있음을 브로커에 보냄
- key, value serializer : Serializer 지정
- ack-mode : manual : 수동 커밋 지정 (메세지 처리 성공시에는 offset 커밋)
- enable.idempotence : 같은 메세지를 여러번 보내도 브로커에는 한번만 저장하도록 하는 옵션
- true로 설정하는것을 권장한다.
- 메세지 전송은 성공 했으나, ack 가 실패했을경우, 재전송 -> 브로커에 중복된 메세지가 쌓일 수 있음을 방지
- retry와 ack:all 설정을 하도록 강제한다.
커스텀 이벤트 클래스 예
public record OrderCreatedEvent(
Long orderId,
String productName,
int quantity
) {}
kafka로 전송하는 예 (producer)
@Component
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
public void send(OrderCreatedEvent event) {
kafkaTemplate.send("order.created", event.orderId().toString(), event)
.addCallback(
result -> {
// ack 성공 (브로커가 acks 조건 충족)
System.out.println("Send success, offset="
+ result.getRecordMetadata().offset());
},
ex -> {
// ack 실패
System.out.println("Send failed: " + ex.getMessage());
}
);;
}
}
- 이를 주입받아서 호출하여 전송
- send 인자값 : 토픽명, 키값, value 값
- 키값을 통해 어느 파티션에 저장될지 결정한다.
- 따라서 키값을 메세지들의 순서/처리 흐름을 묶는 기준으로 잘 설계해야 한다.
- 토픽명, 값 으로도 가능 -> 이 경우 키값이 없으므로 sticky partitioner로 보낸다.
- 특정 파티션 하나를 선택하여 사용한다.
- 순서 상관없는 경우에만 사용한다. (로그, 알림 발송 등)
- callback을 통해 전송 실패, 성공 유무를 확인 해야 한다.
kafka 로 부터 이벤트 수신하여 처리하는 예 (consumer)
@Component
@Slf4j
public class OrderConsumer {
@KafkaListener(topics = "order.created")
public void consume(
OrderEvent event,
Acknowledgment ack
) {
// 비즈니스 처리
process(event);
// 성공 시에만 offset commit
ack.acknowledge();
}
private void process(OrderEvent event) {
if (event.amount() < 0) {
throw new IllegalArgumentException("invalid amount");
}
// DB 저장, 외부 호출 등
}
}
- ack.acknoledge() 를 통해 offset commit을 하게 되는데, 이처럼 수동으로 하는것을 권장한다.
- 개발자가 명시적으로 확인해 주는것이 좋다.
- 따라서 enable-auto-commit 설정을 false로 해야 한다.
Consumer 오류 핸들링
- 보통은 DefaultErrorHandler 를 통해 재시도, DLT로 전송의 과정을 수행한다.
- Bean 으로 등록해두면, KafkaListener 어노테이션이 붙은 메서드에서 예외 발생시 자동 동작 한다.
- listener 메소드 내부에서 오류를 삼키지 않아야 한다.
- enable-auto-commit을 false로 둬야 한다.
- ack-mode 를 MANUAL로 둬야 한다. (혹은 RECORD)
- DLT에 메세지가 정상적으로 전송되면 자동으로 offset commit을 수행한다.
@Configuration
public class KafkaErrorHandlerConfig {
@Bean
public DefaultErrorHandler kafkaErrorHandler(
KafkaTemplate<Object, Object> kafkaTemplate
) {
// DLT로 보내는 역할
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) ->
new TopicPartition(
record.topic() + ".DLT",
record.partition()
)
);
// retry 정책
FixedBackOff backOff = new FixedBackOff(
1000L, // 1초 간격
3 // 최대 3회 retry
);
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// retry 하면 안 되는 예외 지정
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class
);
return errorHandler;
}
}
'Backend > SpringBoot' 카테고리의 다른 글
| Outbox 패턴 모니터링 설정 (Prometheus+Grafana 를 위한) (1) | 2026.01.12 |
|---|---|
| Kafka-Outbox 전략 구현 예 (0) | 2026.01.12 |
| Prometheus + Grafana 를 이용한 모니터링 (0) | 2026.01.05 |
| Cache 를 Actuator에서 확인하기 (Caffein 예) (0) | 2026.01.05 |
| Local Cache, Caffein (0) | 2026.01.05 |