Develop/spring-batch

Spring batch에 Spring Data JPA 기반 Querydsl을 적용해보자! (QuerydlsPagingItemReader)

에디개발자 2020. 11. 9. 08:21
반응형

이번 글에서는 Spring batch에 QuerydslPagingItemReader를 생성하여 적용시켜 테스트까지 해보는 과정을 정리하겠습니다.

우아한 형제들의 이동욱님의 세미나와 티스토리를 엄청나게 참조해서 만들었습니다. 

모든 소스는 github에 있습니다.

 

SpringBatch에 대해서 궁금하시면?

JPA에 대해서 궁금하시면? 

Querydsl에 대해서 궁금하시면?

가장 먼저 Spring batch에 Querydsl을 적용하려면 QuerydslItemReader를 생성해야합니다. ㅠ

Spring batch에서는 Querydsl을 지원하지 않습니다.

 

QuerydslPagingItemReader

우선 QuerydslPagingItemReader.java 소스를 먼저 살펴보겠습니다. 

package net.newploy.payroll.batch.querydsl;

import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
import org.springframework.batch.item.database.AbstractPagingItemReader;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;

public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {
    protected final Map<String, Object> jpaPropertyMap = new HashMap<>();
    protected EntityManagerFactory entityManagerFactory;
    protected EntityManager entityManager;
    protected Function<JPAQueryFactory, JPAQuery<T>> queryFunction;
    protected boolean transacted = true;//default value

    protected QuerydslPagingItemReader() {
        setName(ClassUtils.getShortName(QuerydslPagingItemReader.class));
    }

    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
                                    int pageSize,
                                    Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
        this();
        this.entityManagerFactory = entityManagerFactory;
        this.queryFunction = queryFunction;
        setPageSize(pageSize);
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    @Override
    protected void doOpen() throws Exception {
        super.doOpen();

        entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
        if (entityManager == null) {
            throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
        }
    }

    @Override
    protected void doReadPage() {
        clearIfTransacted();

        JPAQuery<T> query = createQuery()
                .offset(getPage() * getPageSize())
                .limit(getPageSize());

        initResults();

        fetchQuery(query);
    }

    protected void clearIfTransacted() {
        if (transacted) {
            entityManager.clear();
        }
    }

    protected JPAQuery<T> createQuery() {
        JPAQueryFactory queryFactory = new JPAQueryFactory(entityManager);
        return queryFunction.apply(queryFactory);
    }

    protected void initResults() {
        if (CollectionUtils.isEmpty(results)) {
            results = new CopyOnWriteArrayList<>();
        } else {
            results.clear();
        }
    }

    protected void fetchQuery(JPAQuery<T> query) {
        if (!transacted) {
            List<T> queryResult = query.fetch();
            for (T entity : queryResult) {
                entityManager.detach(entity);
                results.add(entity);
            }
        } else {
            results.addAll(query.fetch());
        }
    }

    @Override
    protected void doJumpToPage(int itemIndex) {

    }

    @Override
    protected void doClose() throws Exception {
        entityManager.close();
        super.doClose();
    }
}

 

하나씩 살펴보겠습니다.

protected final Map<String, Object> jpaPropertyMap = new HashMap<>();
protected EntityManagerFactory entityManagerFactory;
protected EntityManager entityManager;
protected Function<JPAQueryFactory, JPAQuery<T>> queryFunction;
protected boolean transacted = true;//default value
  • jpaPropertyMap
    • 현재는 사용하지 않습니다.
    • EntityManagerFactory 설정값입니다.
  • EntityManagerFactory
    • Querydsl은 JPA 기반이므로 EntityManager를 사용합니다.
    • EntityManager Factory입니다.  EntityManager를 필요할때마다 가져다 쓸 수 있는..
  • EntityManager
    • JPA기반이므로 사용합니다.
    • JPA는 Entity 기반으로 작동하므로 Entity를 관리하는 객체입니다.
  • queryFuction
    • java8부터 도입된 변수입니다. 함수형 인터페이스입니다. 
      • 예제) JPAQuery<T> jpaQuery = queryFunction.apply(JPAQueryFactory)
    • Querydsl을 작성하여 해당 변수에 세팅하여 사용합니다.
  • transacted
    • 현재는 사용하지 않습니다.

 

public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
                                int pageSize,
                                Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
    this();
    this.entityManagerFactory = entityManagerFactory;
    this.queryFunction = queryFunction;
    setPageSize(pageSize);
}
  • 생성자에서 필요한 파라미터들을 받아 세팅합니다.
  • setPageSize : PagingItemReader 이므로 pageSize 세팅 ( Default : 10 )

 

@Override
protected void doOpen() throws Exception {
    super.doOpen();

    entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
    if (entityManager == null) {
        throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
    }
}
  • Batch 시작 시 Reader를 세팅할 때 doOpen메서드를 호출합니다.
  • Querydsl을 사용하기 위한 EntityManager를 세팅합니다. 
  • 반드시 세팅해야합니다! 전 Reader세팅을 잘못해서 이 메소드를 타지않아 Querydsl이 정상적으로 실행되지 않았었습니다. ㅠ

 

@Override
protected void doReadPage() {
    clearIfTransacted();

    JPAQuery<T> query = createQuery()
            .offset(getPage() * getPageSize())
            .limit(getPageSize());

    initResults();

    fetchQuery(query);
}
  • Batch에서 Reader 작업을 수행하는 메소드입니다.
  • createQuery
    • 생성된 쿼리를 받고 PagingItemReader이다 보니 세팅된 page 개수만큼 조회를 위한 offset과 limit을 세팅 후 JPAQuery 객체를 생성합니다.
  • initResults
    • 쿼리를 실행 전 reader에서 return 될 파라미터를 초기화합니다.
  • fetchQuery
    • 쿼리를 실행하고 결과값을 return 될 파라미터에 세팅합니다.

 

QuerydslPagingItemReaderJobConfiguration

실제 Job이 실행될 클래스파일입니다. 

 

package net.newploy.payroll.batch.job.querydsl;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.newploy.payroll.batch.entity.People;
import net.newploy.payroll.batch.querydsl.QuerydslPagingItemReader;
import net.newploy.payroll.batch.support.PeopleRepositorySupport;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class QuerydslPagingItemReaderJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private final PeopleRepositorySupport peopleRepositorySupport;

    private final int chunkSize = 10;

    @Bean
    public Job querydslPagingJob() {
        return jobBuilderFactory.get("QUERYDSL_JOB")
                .start(querydslPagingStep())
                .build();
    }

    @Bean
    public Step querydslPagingStep() {
        return stepBuilderFactory.get("QUERYDSL_STEP")
                .<People, People>chunk(chunkSize)
                .reader(reader())
                .writer(writer())
                .build();
    }

    @Bean
    public QuerydslPagingItemReader<People> reader() {
        return new QuerydslPagingItemReader<>(entityManagerFactory, chunkSize, queryFactory -> peopleRepositorySupport.findAll());
    }

    private ItemWriter<People> writer() {
        return list -> {
            for (People people: list) {
                log.info("person={}", people);
            }
        };
    }
}

 

Reader 부분만 자세히 살펴보겠습니다.

@Bean
public QuerydslPagingItemReader<People> reader() {
    return new QuerydslPagingItemReader<>(entityManagerFactory, chunkSize, queryFactory -> peopleRepositorySupport.findAll());
}
  • QuerydslPaingItemReader 객체를 생성합니다.
  • queryFactory -> peopleRepositorySupport.findAll()
    • Reader에서 조회할 쿼리를 함수형 인터페이스 형식으로 작성합니다.
    • 실제 실행은 peopleRepositorySupport.findAll() 이 됩니다.

 

실행 결과

2020-11-06 14:13:20.842 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=1
2020-11-06 14:13:21.316 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=2
2020-11-06 14:13:21.317 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=3
2020-11-06 14:13:21.317 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=4
2020-11-06 14:13:21.317 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=5
2020-11-06 14:13:21.317 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=6
2020-11-06 14:13:21.317 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=7
2020-11-06 14:13:21.318 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=8
2020-11-06 14:13:21.318 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=9
2020-11-06 14:13:21.318 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=10
2020-11-06 14:13:21.318 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat is complete according to policy and result value.
2020-11-06 14:13:21.319  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3909, firstName=용태2, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3910, firstName=용태, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3911, firstName=용태2, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3912, firstName=용태, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3913, firstName=용태2, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3914, firstName=용태, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3915, firstName=용태2, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3916, firstName=용태, lastName=임, enabled=0)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3917, firstName=용태2, lastName=임, enabled=1)
2020-11-06 14:13:21.320  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3918, firstName=용태, lastName=임, enabled=1)
2020-11-06 14:13:21.321 DEBUG 46769 --- [           main] o.s.b.c.step.item.ChunkOrientedTasklet   : Inputs not busy, ended: false
2020-11-06 14:13:21.321 DEBUG 46769 --- [           main] o.s.batch.core.step.tasklet.TaskletStep  : Applying contribution: [StepContribution: read=10, written=10, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
2020-11-06 14:13:21.326 DEBUG 46769 --- [           main] o.s.batch.core.step.tasklet.TaskletStep  : Saving step execution before commit: StepExecution: id=23, version=1, name=QUERYDSL_STEP, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
2020-11-06 14:13:21.345 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=2
2020-11-06 14:13:21.346 DEBUG 46769 --- [           main] o.s.b.c.s.c.StepContextRepeatCallback    : Preparing chunk execution for StepContext: org.springframework.batch.core.scope.context.StepContext@120d3fd
2020-11-06 14:13:21.346 DEBUG 46769 --- [           main] o.s.b.c.s.c.StepContextRepeatCallback    : Chunk execution starting: queue size=0
2020-11-06 14:13:21.349 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Starting repeat context.
2020-11-06 14:13:21.349 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=1
2020-11-06 14:13:21.353 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=2
2020-11-06 14:13:21.353 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat operation about to start at count=3
2020-11-06 14:13:21.353 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat is complete according to policy and result value.
2020-11-06 14:13:21.353  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3919, firstName=용태2, lastName=임, enabled=1)
2020-11-06 14:13:21.353  INFO 46769 --- [           main] QuerydslPagingItemReaderJobConfiguration : person=People(personId=3920, firstName=용태, lastName=임, enabled=1)
2020-11-06 14:13:21.353 DEBUG 46769 --- [           main] o.s.b.c.step.item.ChunkOrientedTasklet   : Inputs not busy, ended: true
2020-11-06 14:13:21.353 DEBUG 46769 --- [           main] o.s.batch.core.step.tasklet.TaskletStep  : Applying contribution: [StepContribution: read=2, written=2, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
2020-11-06 14:13:21.362 DEBUG 46769 --- [           main] o.s.batch.core.step.tasklet.TaskletStep  : Saving step execution before commit: StepExecution: id=23, version=2, name=QUERYDSL_STEP, status=STARTED, exitStatus=EXECUTING, readCount=12, filterCount=0, writeCount=12 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=2, rollbackCount=0, exitDescription=
2020-11-06 14:13:21.372 DEBUG 46769 --- [           main] o.s.batch.repeat.support.RepeatTemplate  : Repeat is complete according to policy and result value.
2020-11-06 14:13:21.373 DEBUG 46769 --- [           main] o.s.batch.core.step.AbstractStep         : Step execution success: id=23
2020-11-06 14:13:21.382  INFO 46769 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [QUERYDSL_STEP] executed in 596ms
2020-11-06 14:13:21.389 DEBUG 46769 --- [           main] o.s.batch.core.step.AbstractStep         : Step execution complete: StepExecution: id=23, version=4, name=QUERYDSL_STEP, status=COMPLETED, exitStatus=COMPLETED, readCount=12, filterCount=0, writeCount=12 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=2, rollbackCount=0
2020-11-06 14:13:21.393 DEBUG 46769 --- [           main] o.s.batch.core.job.AbstractJob           : Upgrading JobExecution status: StepExecution: id=23, version=4, name=QUERYDSL_STEP, status=COMPLETED, exitStatus=COMPLETED, readCount=12, filterCount=0, writeCount=12 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=2, rollbackCount=0, exitDescription=
2020-11-06 14:13:21.401 DEBUG 46769 --- [           main] o.s.batch.core.job.AbstractJob           : Job execution complete: JobExecution: id=23, version=1, startTime=Fri Nov 06 14:13:20 KST 2020, endTime=null, lastUpdated=Fri Nov 06 14:13:20 KST 2020, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=23, version=0, Job=[QUERYDSL_JOB]], jobParameters=[{version=2}]
2020-11-06 14:13:21.417  INFO 46769 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=QUERYDSL_JOB]] completed with the following parameters: [{version=2}] and the following status: [COMPLETED] in 705ms
2020-11-06 14:13:21.419 DEBUG 46769 --- [           main] BatchConfiguration$ReferenceTargetSource : Initializing lazy target object
2020-11-06 14:14:19.123  WARN 46769 --- [extShutdownHook] o.s.b.f.support.DisposableBeanAdapter    : Destroy method 'close' on bean with name 'jpaPagingItemReader' threw an exception: org.springframework.batch.item.ItemStreamException: Error while closing item reader
2020-11-06 14:14:19.124  INFO 46769 --- [extShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2020-11-06 14:14:19.127  INFO 46769 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2020-11-06 14:14:19.128  INFO 46769 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2020-11-06 14:14:19.151  INFO 46769 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

정상적으로 실행되었습니다. :) 

전 배치 실행 후 서버 다운하는 설정을 해놓아서 죽었습니다. ( 이 설정은 Spring Batch 실행 후 서버 내리는 방법, Jenkins Schedule 사용 시 서비스 관리 방법 를 참조해주세요. )

 

 

 

 

 

 

 

반응형