Develop/spring-batch

Spring Batch 란? (예제 소스 포함)

에디개발자 2020. 11. 3. 10:35
반응형

작성한 내용은 우아한 형제들의 이동욱님 블로그, 세미나를 참조해서 작성하였습니다. 

테스트 및 적용 내용은 Github에 올려두었습니다.

 

사내에서 배치 프로젝트를 맡아 Spring Batch를 적용하기로 결정하였습니다.

 

Spring Batch 사용 이전

Spring Batch 이전 스케쥴링 작업은 Scheduler, Quartz 등으로 Spring에서 Annotation으로 선언하여 지정된 시간에 스케줄링을 돌려 구현해놓은 클래스를 호출하여 로직을 실행한다.

 

기존 스케쥴링 방식으로는 대용량 데이터를 처리하는 도중 에러가 발생하면 처리했던 모든 데이터를 롤백시키고 다시 처음부터 처리해야하고, 몇 번째 데이터에서 에러가 발생했는지, 해당 배치 로직이 성공했는지 실패했는지 로그처리를 하여 확인해야하는 불편함이 있다.

 

Batch vs Quartz

구글링을 하다보면 위와 같은 vs가 많이 나온다. 애초에 비교 대상이 아니다.

Batch는 대용량 데이터를 일괄적으로 처리하는 기능이고 Quartz는 스케쥴링 역할이다.

 

Spring Batch

Spring에서 지원하는 대용량 데이터를 일괄적으로 처리하는 기능이다.

schema-mysql.sql 을 통해 Spring Batch Meta 정보 관련 Database table 생성

 

chunk, page 개념을 사용

  • page : 처리할 데이터 중 일정 개수만큼 조회
  • chunk : 조회된 데이터를 일정 개수만큼 처리 후 입력

세팅은 page = chunk * n 으로 세팅해야 성능적으로 가장 좋다고 한다. 보편적으로는 page = chunk

 

Job

Spring Batch가 해야할 작업 중 가장 큰 범위를 나타낸다.

Bean을 통해 Job을 등록시킨 후 Spring Boot 실행 시 간단한 Parameter 설정을 통해 원하는 Job을 실행시킬 수 있다.

  
  @Bean
  public Job jpaPagingItemReaderJob() {
      return jobBuilderFactory.get("jpaPagingItemReaderJob")
              .start(jpaPagingItemReaderStep())
              .build();
  }
  

특징

  • N개의 Job을 생성할 수 있다.
  • 1개의 Job은 N개의 Step을 포함할 수 있다.
  • 1개의 Step은 Tasklet or (Reader, Processor, Writer)를 포함할 수 있다.

FLOW

  • next

Job에서 Next 설정을 통해 순차적으로 Step을 실행시킬 수 있다.

jobBuilderFactory.get("stepNextJob")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();

 

  • from, on, to, end

다양한 설정을 통해 step 분기를 설정할 수 있다.

	jobBuilderFactory.get("deciderJob")
                .start(startStep())
                .next(decider()) // 홀수 | 짝수 구분
                .from(decider()) // decider의 상태가
                    .on("ODD") // ODD라면
                    .to(oddStep()) // oddStep로 간다.
                .from(decider()) // decider의 상태가
                    .on("EVEN") // EVEN라면
                    .to(evenStep()) // evenStep로 간다.
                .end() // builder 종료
                .build();

 

Parameter

JobScope, StepScope를 설정하여 Parameter Lazy Loading 할 수 있다.

  @Bean
  public Job simpleJob() {
      return jobBuilderFactory.get("simpleJob")
              .start(simpleStep1(null))
              .next(simpleStep2(null))
              .build();
  }

  @Bean
  @JobScope
  public Step simpleStep1(@Value("#{jobParameters[requestDate]}") String requestDate) {
      return stepBuilderFactory.get("simpleStep1")
              .tasklet((contribution, chunkContext) -> {
                  throw new IllegalArgumentException("step1에서 실패했다.");
              })
              .build();
  }

java jar로 배포할 때 파라미터 설정한다.

java -jar ... requestDate=20201010

 

Tasklet

Step의 Task를 Simple하게 구현할 수 있다.

	stepBuilderFactory.get("simpleStep2")
                .tasklet((contribution, chunkContext) ->{
                    log.info(">>>>>>> This is Step2");
                    log.info(">>>>>>> requestDate ={}", requestDate);
                    return RepeatStatus.FINISHED;
                })
                .build();

 

Reader, Processor, Writer

Step의 Task를 Reader, Processor, Writer 3가지로 나누어 구현할 수 있다.

	stepBuilderFactory.get("jdbcBatchItemWriterStep")
                .<People, People>chunk(chunkSize)
                .reader(mybatisBatchItemWriterReader())
                .processor(mybatisCompositeProcessor())  // not required
                .writer(mybatisBatchItemWriter())
                .build();

 

Reader

데이터 조회하는 메소드

	new MyBatisPagingItemReaderBuilder<People>()
                .sqlSessionFactory(sqlSessionFactory)
                .queryId("net.newploy.payroll.batch.mapper.PeopleMapper.selectById")
                .parameterValues(datesParameters)
                .pageSize(chunkSize)
                .build()

데이터 조회 타입

  • CursorItemReader : stream 방식으로 1건씩 처리
  • PagingItemReader : page size만큼 조회하여 처리
  •  

Processor ( 필수 아님 )

데이터 처리하는 메소드

	person -> {
            log.info("안녕하세요. "+ person.getFirstName() + "입니다.");
            return person;
        }

프로세스 처리 타입

  • ItemProcessorAdapter : 
    • 거의 사용하지 않는다고 함
  • ValidatingItemProcessor
    • validate 처리 거의 사용하지 않는다고 함
  • CompositeItemProcessor
    • 프로세스를 체인닝 처리하여 순차적 진행

 

Writer

데이터 저장하는 메소드

	new MyBatisBatchItemWriterBuilder<People>()
                .sqlSessionFactory(sqlSessionFactory)
                .statementId("net.newploy.payroll.batch.mapper.PeopleMapper.insert")
                .itemToParameterConverter(createItemToParameterMapConverter(1))
                .build();

 

Batch Flow ( Cursor 기준 )

Data total : 100 / PageSize : 50 / ChunkSize : 10

  1.  ItemReader에서 PageSize만큼 조회 ( 50개 조회 )
  2.  데이터 처리 시작 ( 10개 )
  3.  ItemReader에서 ItemProcessor로 1건 씩 넘겨준다. 
  4.  ChunkSize만큼 ItemProcessor에서 1건 씩 처리 ( 10개 처리 )
  5.  ItemWriter로 처리한 데이터 리스트로 전달 ( 10개 전달 )
  6.  ItemWriter에서 10개 입력 처리
  7.  “2 ~ 6” 5회 반복
  8.  “ 1 “ 처리하여 데이터 조회
  9.  “ 2 ~ 6 “ 5회 반복
  10.  End

반응형