Spring batch는 대용량의 데이터를 처리할때 사용하는 Spring의 공식 배치 처리 솔루션이다. 예를들어 100만건의 데이터를 읽어 오거나 값을 수정해야 하는 작업을 해야 할 때 매우 유용하게 이용할 수 있다. 또한, 특정 시간대에 주기적으로 스케줄링 하여 동작할때, @Scheduled 와 같이 사용한다. 이 글에서는 Spring boot 3.xx 버전, batch 5.x 버전 기준으로 작성한다.
일단, Gradle에 다음을 추가 한다.
dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-batch'
...
}
Batch를 실행하기전, Batch 관련 DB 에 테이블들이 반드시 존재 해야 한다. 이유는 batch 관련 작업들의 기록을 남기기 때문인데, H2 같이 로컬에서 사용하는 메모리DB는 설정을 통해 자동으로 테이블이 만들어지나, 외부에 있는 DB의 경우 테이블이 자동으로 만들어지지 않는 경우도 많다.
org/springframework/batch/core/schema-postgresql.sql 처럼 각 DB 에 맞는 쿼리문을 별도로 실행하여 테이블을 만들어 줘야 한다. 인텔리제이를 사용하는 경우 schema-postgresql.sql를 검색하여 찾을 수 있고, 그 외는 직접 gradle 을 통해 경로를 찾아서 jar 파일 압축을 해제 해야 한다.
아래는 batch 5.x 기준, 테이블 DDL 이다.
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
Spring batch 의 구성은 다음과 같다.
- Job 생성 및 등록
- Job은 여러개의 Step 으로 구성 할 수 있다.
- Step 등록
- Step은 Reader -> Processor -> Writer 로 구성된 한 작업단위이다.
- 데이터를 읽고, 처리한 후, 저장하는 단계를 따른다.
- 필요시, listener 를 등록하여 step 시작 전, 후 처리를 할 수 있다.
- chunk 를 이용하여 한번에 수행 할 작업 단위를 설정한다 (예 : 100건씩)
- Step은 Reader -> Processor -> Writer 로 구성된 한 작업단위이다.
- 등록된 Job을 실행
listener 예는 다음과 같다.
- beforeStep 과 afterStep 으로 구성되어 있다.
@Component
public class CustomStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Step 시작!");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("Step 종료!");
System.out.println("읽은 데이터 수 : " + stepExecution.getReadCount());
if (stepExecution.getStatus() == BatchStatus.FAILED) {
System.out.println("Step 실패 원인: " + stepExecution.getFailureExceptions());
}
return stepExecution.getExitStatus(); // ExitStatus 변경 가능
}
}
전체 예제를 살펴보면 다음과 같다.
@Configuration
@RequiredArgsConstructor
public class MemberStatusBatchConfig {
private final EntityManagerFactory entityManagerFactory;
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final MemberRepository memberRepository;
@Bean
public Job memberStatusJob() {
return new JobBuilder("memberStatusJob", jobRepository)
.start(updateStatusStep())
.build();
}
@Bean
public Step updateStatusStep() {
return new StepBuilder("updateStatusStep", jobRepository)
.<Member, Member>chunk(1000, transactionManager)
.reader(memberReader(null))
.processor(memberProcessor())
.writer(memberWriter())
.listner(CustomStepListener())
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<Member> memberReader() {
return new JpaPagingItemReaderBuilder<Member>(
@Value("#{jobParameters['memberId']}") Long memberId)
.name("memberReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT u FROM User u WHERE u.memberId = :memberId AND u.status = 'INACTIVE'")
.parameterValues(Map.of("memberId",UUID.fromString(memberId))
.pageSize(1000)
.build();
}
@Bean
public ItemProcessor<Member, Member> memberProcessor() {
return member -> {
member.setStatus("ACTIVE");
return member;
};
}
@Bean
public ItemWriter<Member> memberWriter() {
return members -> memberRepository.saveAll(members);
}
}
- JobRepository 와 PlatformTransactionManager는 스프링부트에서 주입 해준다.
- JobRepository : Job, Step 상태 기록 및 재실행 정보를 저장함.
- PlatformTransactionManager : chunk 단위 트랜잭션 관리. commit과 rollback 담당함.
- memberStatusJob => Job 정의 밎 등록부
- start() 에 실행할 step을 명시한다. step이 여러개이면 여러개를 명시 할 수 있다.
- updateStatusStep => Step 정의 및 등록부
- <Member,Member> chunk(1000) => 입력, 출력 형식으로 Member를 지정한 예이다. chunk 는 입, 출력시 한번에 처리할 개수 이다.
- reader() 에 실행할 reader를 명시한다.
- 만일 reader에 명시할 메소드가 인자값을 받는다면, 이 시점에서는 null을 넣어준다 (추후 job 실행시 파라메터 주입받는다)
- processor() 에 읽어온 데이터를 처리할 프로세서를 명시한다.
- write() 에 프로세서 실행 결과를 저장할 writer를 명시한다.
- reader() 에 실행할 reader를 명시한다.
- <Member,Member> chunk(1000) => 입력, 출력 형식으로 Member를 지정한 예이다. chunk 는 입, 출력시 한번에 처리할 개수 이다.
- memberReader
- reader를 생성, 등록하고 JPQL 쿼리문을 통해 쿼리를 하여 데이터를 가져오는 부분이다.
- 만일, 파라메터를 통해 특정 값을 받아와야 할 경우 @StepScore 어노테이션을 써야한다.
- @Value("#{jobParameters['파라메터명']}") 자료형 변수명 을 통해 job 실행시 넘겨주는 파라메터를 받아온다.
- .parameterValues 를 통해 받아온 값을 JPQL에 넣어줘야 한다. (JpaPagingItemReader 특징)
- 예시에서는 JPQL 을 썼으나, QueryDslPagingItemReader 나, JdbcCursorItemReader 혹은 JdbcPagingItemReader 를 이용 할 수도 있다.
- memberProcessor
- processor를 생성하고 등록한 후, reader 에서 읽어온 데이터를 기반으로 특정 작업을 수행한다.
- memberWriter
- processor 에서 처리한 데이터를 repository를 통해 실제로 저장한다.
스케줄러의 예는 다음과 같다.
01 @Configuration
02 @EnableScheduling
03 @RequiredArgsConstructor
04 public class BatchScheduler {
05
06 private final JobLauncher jobLauncher;
07 private final Job memberStatusJob;
08
09 // 매일 새벽 3시 실행
10 @Scheduled(cron = "0 0 3 * * *")
11 public void runInactiveUserCleanup() throws Exception {
12 JobParameters params = new JobParametersBuilder()
13 .addLong("time", System.currentTimeMillis()) // 중복 실행 방지
.addLong("memberId", 123L)
14 .toJobParameters();
15
16 jobLauncher.run(memberStatusJob, params);
17 }
18 }
- JobLauncher 는 job 실행 컴포넌트로 스프링이 제공해준다.
- memberStatusJob은 위의 Spring batch에서 등록한 빈 이다.
- JobParameter를 생성하여 스케줄러 실행시 중복 실행을 방지한다. 시간값을 준다.
- 필요한 인자값이 있다면 넘겨줄 수 있다.
- jobLauncher를 통해 작업을 실행한다.
'Backend > SpringBoot' 카테고리의 다른 글
| Retryable, 낙관적 락 간편 적용하기 (0) | 2025.12.09 |
|---|---|
| Flyway 사용하기 (0) | 2025.12.04 |
| Spring Actuator 로 health체크 하기 (0) | 2025.11.10 |
| 각종 설정 오버라이드 (WebMvcConfigurer) (0) | 2025.11.07 |
| Admin 페이지 설정하기 (0) | 2025.11.05 |