군만두의 IT 개발 일지

[Java] Spring Batch - Job/Step 구조와 정산 배치 구현 본문

개발일지/패스트캠퍼스

[Java] Spring Batch - Job/Step 구조와 정산 배치 구현

mandus 2024. 3. 3. 19:07

목차

    ⭐ 요약


    • Spring Batch의 핵심 구조인 Job, Step, Reader/Processor/Writer 패턴을 학습한다.
    • 유료 API 정산 배치를 구현하면서 일일 정산주간 정산 흐름을 직접 설계한다.
    • ExecutionContext 공유, Partitioning, JobExecutionDecider, CompositeItemWriter 등 실무에서 자주 쓰이는 기능을 함께 학습한다.

    ⭐ Spring Batch란?


    Spring Batch는 대용량 데이터를 일괄 처리(Batch Processing)하기 위한 Spring 프레임워크이다. 실시간으로 처리하기 어려운 대량의 데이터를 특정 시간에 모아서 처리할 때 사용한다. 대표적인 사용 사례로는 정산, 데이터 마이그레이션, 리포트 생성 등이 있다.

     

    Spring Batch의 핵심 구성 요소는 아래와 같다.

    구성 요소 역할
    Job 배치 작업의 최상위 단위. 하나 이상의 Step으로 구성됨
    Step Job의 실제 실행 단위. Reader → Processor → Writer 순으로 동작함
    ItemReader 데이터를 읽어오는 역할 (DB, 파일, API 등)
    ItemProcessor 읽어온 데이터를 가공·변환하는 역할
    ItemWriter 가공된 데이터를 저장하거나 전송하는 역할
    JobRepository Job 실행 이력과 상태를 DB에 저장하는 역할
    JobParameters Job 실행 시 전달하는 파라미터 (ex. targetDate)

    ⭐ 학습 내용


    1. 프로젝트 구성

     

    Spring Boot 3.1.0과 Spring Batch를 함께 사용하는 프로젝트를 구성했다. DB는 MySQL을 사용하고, JPA로 정산 데이터를 관리한다. 아래는 핵심 의존성이다.

    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-batch'
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        runtimeOnly 'com.mysql:mysql-connector-j'
    }

     

    또한 Spring Batch는 Job 실행 이력을 저장하기 위한 메타 테이블이 필요하다. application.properties에 아래 설정을 추가하면 자동으로 테이블이 생성된다.

    spring.batch.jdbc.initialize-schema=always

     

    2. 정산 배치 전체 흐름

     

    이번 프로젝트에서 구현한 정산 배치의 전체 흐름은 아래와 같다. 매일 실행되며, 금요일에는 주간 정산 Step이 추가로 실행된다.

    Step 역할 실행 조건
    preSettleDetailStep CSV 파일을 읽어 고객사·서비스별 호출 횟수를 집계하고 ExecutionContext에 저장 매일
    settleDetailStep 집계된 데이터를 SettleDetail 엔티티로 변환하여 DB에 저장 매일
    settleGroupStep 1주일치 SettleDetail을 고객사·서비스별로 집계하고 DB 저장 후 이메일 발송 매주 금요일

     

    3. JobParametersValidator - 파라미터 검증

     

    Job 실행 시 전달받는 targetDate 파라미터가 yyyyMMdd 형식인지 검증하는 커스텀 Validator를 구현했다. JobParametersValidator 인터페이스를 구현하면 Job 시작 전에 자동으로 검증이 실행된다.

    public class DateFormatJobParametersValidator implements JobParametersValidator {
    
        private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
        private final String[] names;
    
        @Override
        public void validate(JobParameters parameters) throws JobParametersInvalidException {
            for (String name : names) {
                validateDateFormat(parameters, name);
            }
        }
    
        private void validateDateFormat(JobParameters parameters, String name)
                throws JobParametersInvalidException {
            try {
                final String string = parameters.getString(name);
                LocalDate.parse(Objects.requireNonNull(string), dateTimeFormatter);
            } catch (Exception e) {
                throw new JobParametersInvalidException("yyyyMMdd 형식만을 지원합니다.");
            }
        }
    }

     

    4. Step 1 - preSettleDetailStep: CSV 읽기 및 집계

     

    첫 번째 Step은 targetDate에 해당하는 CSV 파일을 읽어 API 호출 성공 건수를 고객사·서비스별로 집계하는 역할을 한다. 집계 결과는 ConcurrentHashMap에 담아 StepExecutionContext에 저장한다.

     

    Step 간에 데이터를 공유하려면 StepExecutionContext에 저장된 데이터를 JobExecutionContext로 승격시켜야 한다. 이 역할을 ExecutionContextPromotionListener가 담당한다.

    // Step1의 StepExecutionContext에 있는 "snapshots" 키를
    // JobExecutionContext로 승격시킨다.
    @Bean
    public ExecutionContextPromotionListener promotionListener() {
        final ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[]{"snapshots"});
        return listener;
    }
    
    // Step2에서 JobExecutionContext를 통해 Step1의 집계 데이터에 접근한다.
    @Override
    public void beforeStep(StepExecution stepExecution) {
        final JobExecution jobExecution = stepExecution.getJobExecution();
        final Map<Key, Long> snapshots =
            (ConcurrentHashMap<Key, Long>) jobExecution.getExecutionContext().get("snapshots");
        iterator = snapshots.entrySet().iterator();
    }

     

    5. Step 2 - settleDetailStep: DB 저장

     

    두 번째 Step은 Step 1에서 집계한 데이터를 SettleDetail 엔티티로 변환하여 DB에 저장한다. SettleDetailProcessor에서 서비스 정책(ServicePolicy)을 조회하여 건당 요금을 계산하고, JpaItemWriter로 DB에 저장한다.

    @Override
    public SettleDetail process(KeyAndCount item) throws Exception {
        final Key key = item.key();
        final ServicePolicy servicePolicy = ServicePolicy.findById(key.serviceId());
        final Long count = item.count();
        final String targetDate = stepExecution.getJobParameters().getString("targetDate");
    
        return new SettleDetail(
                key.customerId(),
                key.serviceId(),
                count,
                servicePolicy.getFee() * count,   // 건당 요금 × 호출 횟수
                LocalDate.parse(targetDate, dateTimeFormatter)
        );
    }

     

    6. JobExecutionDecider

     

    주간 정산은 매주 금요일에만 실행되어야 한다. JobExecutionDecider를 구현하면 조건에 따라 Step의 실행 여부를 동적으로 결정할 수 있다. 금요일이 아닌 경우 "NOOP"을 반환하여 주간 정산 Step을 건너뛴다.

    public JobExecutionDecider isFridayDecider() {
        return (jobExecution, stepExecution) -> {
            final String targetDate = stepExecution.getJobParameters().getString("targetDate");
            final LocalDate date = LocalDate.parse(targetDate, formatter);
    
            if (date.getDayOfWeek() != DayOfWeek.FRIDAY) {
                return new FlowExecutionStatus("NOOP");  // 주간 정산 건너뜀
            }
            return FlowExecutionStatus.COMPLETED;        // 주간 정산 실행
        };
    }
    
    // Job 흐름 정의
    new JobBuilder("settleJob", jobRepository)
        .start(preSettleDetailStep)
        .next(settleDetailStep)
        .next(isFridayDecider())        // 금요일 여부 판단
        .on("COMPLETED").to(settleGroupStep)  // 금요일이면 주간 정산 실행
        .build()
        .build();

     

    7. Step 3 - settleGroupStep: 주간 정산 및 이메일 발송

     

    세 번째 Step은 1주일치 SettleDetail 데이터를 고객사·서비스별로 집계하여 SettleGroup에 저장하고, 고객사에 이메일을 발송한다. 두 가지 Writer를 동시에 실행하기 위해 CompositeItemWriter를 사용한다.

    // CompositeItemWriter: DB 저장과 이메일 발송을 순서대로 실행한다.
    @Bean
    public ItemWriter<List<SettleGroup>> settleGroupItemWriter(
            SettleGroupItemDBWriter settleGroupItemDbWriter,
            SettleGroupItemMailWriter settleGroupItemMailWriter
    ) {
        return new CompositeItemWriter<>(
                settleGroupItemDbWriter,   // 1. DB 저장
                settleGroupItemMailWriter  // 2. 이메일 발송
        );
    }

     

    8. Partitioning

     

    대량 데이터를 빠르게 처리하기 위해 Partitioning을 학습했다. Partitioning은 하나의 Step을 여러 워커(Worker) Step으로 분할하여 병렬로 실행하는 방식이다. 아래 예시에서는 7일치 데이터를 7개의 워커 Step이 동시에 처리하도록 설계했다.

    // 워커 스텝을 7개로 분할하여 병렬 실행한다.
    @Bean
    public PartitionHandler partitionHandler(Step apiOrderGenerateStep) {
        final TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setStep(apiOrderGenerateStep);
        handler.setGridSize(7);  // 워커 7개
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return handler;
    }
    
    // 각 워커 Step이 처리할 날짜를 StepExecutionContext에 주입한다.
    Partitioner getPartitioner(String targetDate) {
        return x -> {
            final Map<String, ExecutionContext> result = new HashMap<>();
            IntStream.range(0, 7).forEach(it -> {
                final ExecutionContext value = new ExecutionContext();
                value.putString("targetDate", date.minusDays(it).format(formatter));
                result.put("partition" + it, value);
            });
            return result;
        };
    }

    ⭐ 어려웠던 점


    • Step 간 데이터 공유 방식이 처음에는 헷갈렸다. Step 1의 StepExecutionContext에 저장한 데이터는 Step 2에서 직접 접근이 불가능하다. ExecutionContextPromotionListener를 사용하여 JobExecutionContext로 승격시켜야 한다는 점을 이해하는 데 시간이 걸렸다.
    • Spring Batch 5.x부터는 @EnableBatchProcessing 없이도 자동 구성이 되는 등 버전별로 설정 방식이 달라서 공식 문서를 꼼꼼히 확인해야 했다.

    ⭐ 후기


    • 실제 서비스에서 필요한 정산 배치를 직접 설계하고 구현해보면서 Spring Batch의 전체 흐름을 체감할 수 있었다.
    • Reader → Processor → Writer 구조는 단순해 보이지만, 각 컴포넌트를 역할에 맞게 분리하면 테스트와 유지보수가 매우 쉬워진다는 점을 코드를 작성하면서 느꼈다.

     

    이 글은 패스트캠퍼스백엔드 개발 캠프에서 공부한 내용을 작성한 것입니다.

     

    Comments