자녀의 성향 정보 히스토리를 삭제할 수 있는데, 그 삭제 정보를 삭제 요청 즉시 논리적으로는 삭제된 것이 맞지만 실제 데이터의 삭제는 한달 뒤 이루어지는 요구사항이 있었다.
이 요구사항을 충족하기 위해 스프링 배치 + 스케줄러를 처음 사용해보아서 스프링 배치에 대해 정리해보고 어떻게 적용하고 테스트해보았는지 정리해보려고 한다.
스프링 배치
배치란 일괄 처리 작업을 의미한다. 많은 데이터를 일정한 주기나 조건에 맞추어 대량으로 처리하기 위해 사용한다.
배치 작업은 일반적으로 대량의 데이터를 처리하거나, 주기적이고 반복적인 작업을 실행하는 데 사용된다. 예시를 들자면 아래와 같다.
- 로깅 및 추적
- 추천 시스템 데이터 작업
- 쿠폰 발송
실시간 처리가 어려운 대량의 데이터를 한 번에 처리하도록 하는 데 적합하다고 볼 수 있다. 그리고 스케줄러를 같이 사용한다면 이를 주기적으로 작업해야 할 때 자동화할 수 있다는 장점이 있다.
실패 시 자동으로 재시도하거나 중단된 지점에서 재개하는 등의 내결함성을 제공한다는 장점도 가진다.
배치와 스케줄러를 혼동하면 안되는 부분은 배치는 대량의 데이터를 일괄적으로 처리하는 것이고, 스케줄러를 통해 특정 주기마다 자동으로 동작하게 하는 것이다.
Spring-Batch 5.x 버전부터 달라진 부분.
- Deprecated
- JobBuilderFactory
- StepBuilderFactory
- JobBuilder(String name)
- StepBuilder(String name)
- 이제 BuilderFactory 대신 Builder를 직접 사용하는 방식으로 바뀌었다.
- Builder를 생성할 때 이름 외에도 JobRepository와 같은 다른 객체들을 명시적으로 전달해야 한다.
- 이 부분은 아래 예시 코드를 보자.
- @EnableBatchProcessing 사용을 하지 않는다.
- SpringBoot 3.xx 이상은 사용하지 않는다고 한다.
- 배치 관련 빈이 자동으로 주입되도록 구성할 수 있다.
- 나는 이 부분을 모르고 고생했다.
- 이 애노테이션을 달고 있으니 스프링 배치에 필요한 스키마가 자동 생성되지 않았다.
- 사용 시 spring batch 기본 설정이 백오프된다고 한다.
Spring Batch 기본 구성 요소
- Job: 배치 작업의 전체 단위를 나타내고 여러 개의 Step으로 구성된다.
- Step: 일종의 단계로 Tasklet 또는 Chunk 기반 처리로 이루어질 수 있다.
- Tasklet: 실제 작업 내용으로 각 Step 별 실행할 비즈니스 로직이 구현되어 있다.
- JobRepository: 배치 작업의 실행 상태, 실행 이력 등을 저장하는 역할을 한다.
- JobLauncher: 배치 작업의 실행을 담당한다.
- JobParameters: 배치 작업을 실행할 때 전달되는 파라미터로서 Job의 고유성을 보장한다.
1
2
3
JobParameters params = new JobParametersBuilder()
.addString("DeleteHistoryJob", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
위 파라미터를 예로 들면, “DeleteHistoryJob”이 파라미터의 이름이다.
이는 배치 작업에 사용되는 특정 파라미터의 식별자 역할을 한다. 고유성을 위해 System.currentTimeMillis()를 String으로 변환한 값을 value로 매핑한다.
- 매일 새벽 4시에 배치 작업이 진행된다고 가정하자.
- 매번 배치 작업을 실행할 때 현재 시각을 기준으로 고유한 JobParameters를 생성한다.
- 이를 통해 Spring Batch는 매일 실행되는 작업을 서로 다른 작업으로 인식한다.
- 만약 파라미터가 존재하지 않는다면 배치는 이미 이전에 실행된 Job을 중복 실행하려고 한다고 인식할 수 있어 새로운 실행으로 처리되지 않을 수 있다.
1
Step already complete or not restartable, so no action to execute
동일한 Job을 동일한 Parameter로 실행하려 하면 위와 같은 메시지가 나타나게 된다.
Batch 스키마, 테이블
- spring.batch.jdbc.initialize-schema
- ALWAYS : 스크립트 항상 실행
- EMBEDDED : 내장 DB일 때만 실행됨 ( 자동 생성 )
- NEVER : 스크립트 항상 실행 안함 -> 내장 DB인 경우 오류 발생
배치 설정을 하고 실행하면 위와 같은 테이블들이 생긴다.
각 역할에 대해 알아보자.
- BATCH_JOB_INSTANCE
- JOB이 실행될 때 JOB Instance 정보가 저장된다. name, key 데이터가 저장된다.
- 해시 값으로 저장되어 중복된 name과 key를 사용할 수 없다.
- BATCH_JOB_EXECUTION
- JOB의 실행 정보가 저장된다.
- JOB 생성, 시작, 종료, 상태, 메시지 등을 관리한다.
- BATCH_JOB_EXECUTION_PARAMS
- JOB을 실행하기 위한 JobParameter 정보를 저장한다.
- BATCH_JOB_EXECUTION_CONTEXT
- JOB 실행 중 여러가지 상태 정보, 공유 데이터를 직렬화해 저장한다.
- STEP 간 서로 공유가 가능하다.
- BATCH_STEP_EXECUTION
- STEP의 실행 정보가 저장된다.
- BATCH_STEP_EXECUTION_CONTEXT
- STEP 별로 지정되어 STEP간 서로 공유할 수 없다.
- 상태 정보, 데이터 등을 저장한다.
Spring Batch 작업 생성
위와 같은 구조로 Job이 실행되면 Job에 포함되는 Step을 실행하게 되고, Step이 실행되면 Tasklet을 실행하도록 구성된다.
이제 Step과 Job, Tasklet을 작성해보자.
- Tasklet 생성
1
2
3
4
5
6
7
@Bean
public Tasklet myTasklet() {
return (contribution, chunkContext) -> {
// 비즈니스 로직 수행
return RepeatStatus.FINISHED;
};
}
Tasklet에 실제 동작할 로직을 작성해주면 된다. DB에서 어떤 데이터를 일괄적으로 조회해 삭제하는 로직을 예시로 들 수 있다.
- Step 생성
1
2
3
4
5
6
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.tasklet(myTasklet(), transactionManager)
.build();
}
생성한 Tasklet을 바탕으로 Step을 생성할 수 있다. 이 step들 여러개로 구성된 Job을 만들 수 있다.
여기서 “step1”의 역할은 Step의 이름을 나타낸다. Step을 식별하고 관리하는 데 사용된다. 예를 들면 JobRepository를 사용하여 각 Step의 상태를 기록하는데 이 기록이 Step 이름을 기준으로 저장되는 것이다.
- Job 생성
1
2
3
4
5
6
@Bean
public Job exampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("exampleJob", jobRepository)
.start(step1)
.build();
}
Step 분기 처리
생성한 step들을 실행시키는 Job을 만들 수 있다. 이 Step은 분기처리로 처리할 수도 있다. 예를 들면 어떤 Step1이 성공했을 때 다음 Step3을 실행시키고 실패했을 때 Step2를 실행시키도록 할 수 있다.
1
2
3
4
5
6
7
8
9
@Bean
public Job decisionJob(JobRepository jobRepository, Step step1, Step step2, Step decisionStep) {
return new JobBuilder("decisionJob", jobRepository)
.start(step1)
.next(decisionStep)
.from(decisionStep).on("COMPLETED").to(step2)
.from(decisionStep).on("FAILED").end()
.build();
}
- step1을 수행 후 decisionStep을 수행한다.
- decisionStep이 실패하면 끝내고, 성공한다면 step2를 수행하는 구조이다.
- 즉, 작업의 흐름에 따라 다른 Step을 실행할 수 있도록 해줄 수 있다.
Chunk 기반 처리
위에서 Tasklet 기반의 처리 방식을 보았다.
Tasklet은 기본적으로 하나의 작업을 수행하는 방식이고 대체로 단순하거나 복잡하지 않은 작업을 수행하는 데 적합하며, 전체 데이터를 처리하는 것이 아니라 일부 데이터나 단일 작업을 처리하는 데 주로 사용된다.
반면 Chunk 방식은 대용량 데이터를 효과적으로 처리하기 위해 사용한다. 큰 데이터를 일련의 작은 데이터 묶음(Chunk)으로 나누고, 각 Chunk를 개별적인 트랜잭션 범위 내에서 처리하는 방식을 취한다.
예를 들면 DB에서 수만개의 레코드를 처리해야하는 경우 Chunk 기반으로 데이터를 일정 크기로 나누어 처리할 수 있다. (ex - 10개씩 처리)
이 방식은 트랜잭션 관리, 효율적인 메모리 사용 측면에서 장점을 가진다.
- 각 Chunk 처리는 Reader, Processor, Writer의 세 단계로 구성된다.
- Reader는 데이터 소스로부터 데이터를 읽어와 Chunk를 생성한다. 이 데이터는 일반적으로 데이터베이스나 파일 혹은 메시지 큐 등이 될 수 있다.
- Processor는 읽어온 데이터에 대해 필요한 처리를 수행한다. 필터링, 변환 등이 그 예시이다.
- Writer는 처리된 데이터를 최종적으로 저장한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
@EnableBatchProcessing
public class BookOrderJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
@JobScope
public Job bookOrderJob() throws Exception {
return jobBuilderFactory.get("bookOrderJob")
.start(bookOrderStep()) // "bookOrderStep"이라는 이름의 Step을 시작점으로 설정
.build();
}
@Bean
@StepScope
public Step bookOrderStep() throws Exception {
return stepBuilderFactory.get("bookOrderStep")
.<Book, Order>chunk(10) // 한 번에 처리할 데이터 항목의 크기(chunk size)를 10으로 설정
.reader(bookReader()) // Reader 설정
.processor(bookOrderProcessor()) // Processor 설정
.writer(orderWriter()) // Writer 설정
.build();
}
// Reader 설정 - 데이터베이스로부터 특정 조건에 맞는 데이터를 읽어오는 역할
@Bean
@StepScope
public JpaPagingItemReader<Book> bookReader() throws Exception {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("stock", 5); // stock 파라미터 값을 5로 설정
return new JpaPagingItemReaderBuilder<Book>()
.name("JpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT b FROM Book b WHERE b.stock <= :stock ORDER BY b.id ASC") // 재고가 5 이하인 책을 조회하는 쿼리
.parameterValues(parameterValues)
.pageSize(10) // 페이징 사이즈를 10으로 설정
.build();
}
// Processor 설정 - 읽어온 데이터를 기반으로 비즈니스 로직을 수행하는 역할
@Bean
@StepScope
public ItemProcessor<Book, Order> bookOrderProcessor() {
return book -> {
Order order = new Order();
order.setBook(book);
order.setQuantity(10); // Reorder quantity
return order; // 재고가 부족한 책에 대해 재주문을 생성
};
}
// Writer 설정 - 처리된 결과 데이터를 데이터베이스에 저장하는 역할
@Bean
@StepScope
public JpaItemWriter<Order> orderWriter() {
return new JpaItemWriterBuilder<Order>()
.entityManagerFactory(entityManagerFactory)
.build();
}
}
위 예시 코드는 재고가 특정 수량 이하인 Book에 대해 주문을 생성하는 작업을 수행하는 코드이다.
경우에 따라 Chunk 방식, Tasklet 방식을 채택해서 사용하면 될 것 같다.
내 구현
자녀의 성향 정보 히스토리를 삭제할 수 있는데, 그 삭제 정보를 삭제 요청 즉시 논리적으로는 삭제된 것이 맞지만 실제 데이터의 삭제는 한달 뒤 이루어지는 요구사항을 위해 배치/스케줄러를 구현하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DeleteHistoryBatchConfig {
private final ChildPersonalityHistoryRepository historyRepository;
@Bean
public Job deleteHistoryJob(JobRepository jobRepository, Step deleteHistoryStep) {
return new JobBuilder("deleteHistoryJob", jobRepository)
.start(deleteHistoryStep)
.build();
}
@Bean
public Step deleteHistoryStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("deleteHistoryStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
log.info("run deleteHistory tasklet");
LocalDateTime oneMonthAgo = LocalDateTime.now().minusMonths(1);
List<ChildPersonalityHistory> historiesToDelete = historyRepository.findAllByRealDelete(oneMonthAgo);
historyRepository.deleteAll(historiesToDelete);
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
- isDeleted 필드가 true라면 논리적으로 삭제된 History이고 false라면 논리적으로도 삭제되지 않은 상태이다.
- 삭제 요청이 왔을 때 isDeleted 필드가 true가 되며, deletedAt 이라는 필드에 삭제 요청 시각 정보가 담긴다.
- 따라서 배치 작업에서는 현재 시각에서 한달 전의 시간을 가지고 삭제할 히스토리들을 조회한다.
- Step 내부에 Tasklet을 구현하는 방식으로 위 로직을 작성했다.
그리고 이 배치 작업을 스케줄러를 통해 매일 새벽 4시에 수행되도록 설정했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@EnableScheduling
@Configuration
@RequiredArgsConstructor
public class DeleteHistoryScheduler {
private final JobLauncher jobLauncher;
private final Job deleteHistoryJob;
@Scheduled(cron = "0 0 4 * * ?") // 매일 새벽 4시에 실행
public void runDeleteHistoryJob() {
log.info("delete history job time: {}", LocalDateTime.now());
try {
JobParameters params = new JobParametersBuilder()
.addString("DeleteHistoryJob", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(deleteHistoryJob, params);
} catch (Exception e) {
log.error("Error", e);
}
}
}
테스트 코드 작성
1
2
3
4
5
6
7
8
9
10
11
@SpringBootTest
@TestConfiguration
public class DeleteHistoryBatchTestConfig {
@Bean
public JobLauncherTestUtils jobLauncherTestUtils(Job deleteHistoryJob) {
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
jobLauncherTestUtils.setJob(deleteHistoryJob);
return jobLauncherTestUtils;
}
}
위 설정을 통해 Spring Batch 테스트를 쉽게 할 수 있도록 한다.
배치 작업을 테스트하기 위해 JobLauncher를 통해 setting된 Job을 실행할 수 있도록 한다.
그리고 아래와 같이 사용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@SpringBootTest
@ExtendWith(MockitoExtension.class)
@SpringBatchTest
public class DeleteHistoryBatchConfigTest {
@MockBean
private ChildPersonalityHistoryRepository historyRepository;
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private DeleteHistoryScheduler deleteHistoryScheduler;
@Test
@DisplayName("배치 작업 테스트")
public void testDeleteHistoryJob() throws Exception {
LocalDateTime oneMonthAgo = LocalDateTime.now().minusMonths(1);
List<ChildPersonalityHistory> histories = new ArrayList<>();
given(historyRepository.findAllByRealDelete(oneMonthAgo)).willReturn(histories);
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
//배치 작업 직접 실행 (jobParameter 전달)
JobExecution jobExecution = jobLauncherTestUtils.getJobLauncher().run(jobLauncherTestUtils.getJob(), jobParameters);
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
verify(historyRepository, times(1)).deleteAll(histories);
}
@Test
@DisplayName("스케줄러 메서드 수행 테스트")
public void testSchedulerJobExecution() throws Exception {
//4시까지 기다리지 않고 스케줄러 메서드를 수동으로 실행한다.
deleteHistoryScheduler.runDeleteHistoryJob();
verify(historyRepository, atLeastOnce()).findAllByRealDelete(any(LocalDateTime.class));
verify(historyRepository, atLeastOnce()).deleteAll(anyList());
}
}
- testDeleteHistoryJob() 에서 배치 작업을 직접 실행하고 작업에 포함되어있는 Repository 메서드가 올바르게 호출되었는지 확인하여 검증한다.
- 결과적으로 BatchStatus.COMPLETED를 통해 배치 작업이 성공적으로 완료되는지 확인하다.
- testSchedulerJobExecution() 에서는 스케줄러의 메서드를 직접 호출해 스케줄러가 정상적으로 작동하는지 테스트 한다.
- 새벽 4시까지 기다리지 않고 직접 스케줄러의 메서드를 호출해 해당 메서드가 정상적으로 동작하는지 테스트할 수 있다.
그리고 혹시 몰라 TestController를 만들고 테스트 데이터를 삽입해 해당 Job을 직접 실행시켜보았다.
위와 같이 원하는 방식으로 잘 동작하는 것을 볼 수 있었다.
정리
이번 배치를 적용하는 요구사항은 비교적 간단했어서 배치 사용법에 대해 공부하는 시간이 길었던 것 같다.
현재 Tasklet으로 구현했는데, 다음에 구현할 배치 작업은 Chunk 방식으로 구현해보고 비교해볼 계획이다.
Tasklet 방식과 Chunk 방식의 차이를 조금 더 체감할 수 있을 것 같다.