Develop/spring-batch

[Kotlin] Spring-Batch QuerydslPagingItemReader 개선편

에디개발자 2021. 8. 4. 07:00
반응형

나를 닮았다고 한다...

들어가기 전에

Spring-Batch 에서 Chunk 개념을 이용하려면 ItemReader를 사용합니다. ItemWriter에서 ItemReader에서 사용한 쿼리에 영향을 미치는 상태값을 변경할 경우가 있습니다. 이와 같은 경우 필요한 기능을 정리한 글입니다.

 

ItemReader를 사용한 Query

ItemReader는 Query에 offset과 limit을 사용합니다. 

// 1
select
    id,
    address,
    name
from person 
where state = 'STUDY'
limit 5
    
// 2
select
    id,
    address,
    name
from person
where state = 'STUDY'
offset 5 limit 5 
    
// 3
select
    id,
    address,
    name
from person
where state = 'STUDY'
offset 10 limit 5

 

ItemWriter에서 Query에 영향을 미치는 상태값 변경

update
      person 
   set
      state='PLAY' 
 where
      id IN (1, 2, 3, 4, 5)

이럴 경우 Batch는 조회하지 못하는 Data가 생깁니다. 

 

예시) 

12개의 데이터를 조회하여 처리하는 배치라고 가정합니다.

pageSize: 3, ChunkSIze: 3으로 설정하고 배치를 실행했을 때 DATA4, DATA5, DATA6 데이터는 조회하지 못하는 문제가 발생합니다. ( 아래 그림 참조 )

1번째 Batch
2번째 Batch

ItemReader에서 offset을 항상 0으로 설정

QueryPagingItemReader는 이동욱님 블로그를 참조하여 생성하였습니다.

open class QuerydslPagingItemReader<T>(
    private val entityManagerFactory: EntityManagerFactory,
    val queryCreator: (JPAQueryFactory) -> JPAQuery<T>
) : AbstractPagingItemReader<T>() {

    private lateinit var entityManager: EntityManager
    private val jpaPropertyMap = mutableMapOf<String, Any>()
    var trasacted = true
    var pageOffset = true    // 1)

    override fun doOpen() {
        super.doOpen()

        entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap)
    }

    override fun doReadPage() {
        clearIfTransacted()

        val query = createQuery()
            .offset((page * pageSize).toLong())
            .limit(pageSize.toLong())

        initResults()
        fetchQuery(query)
    }

    private fun clearIfTransacted() {
        if (trasacted) {
            entityManager.clear()
        }
    }

    private fun createQuery() =
        queryCreator(JPAQueryFactory(entityManager))


    private fun initResults() {
        if (CollectionUtils.isEmpty(results)) {
            results = CopyOnWriteArrayList()
        } else {
            results.clear()
        }
    }

    private fun fetchQuery(query: JPAQuery<T>) {
        if (!trasacted) {
            val queryResult = query.fetch()
            for (entity in queryResult) {
                entityManager.detach(entity)
                results + entity
            }
        } else {
            results.addAll(query.fetch())
        }
    }

    override fun doJumpToPage(itemIndex: Int) {

    }

    override fun doClose() {
        entityManager.close()
        super.doClose()
    }
    // 2)
    override fun getPageSize(): Int {
        return if (pageOffset) super.getPageSize()
        else 0
    }
}

1) pageOffet 설정 변수입니다. 

2) getPageSize 메서드를 override하여 pageOffset 변수값에 따라 pageOffset를 설정합니다. 

 

이전처럼 Writer에서 상태값을 변경하는 경우라면 아래와 같이 ItemReader를 사용할 수 있습니다.

@Bean
fun reader(): QuerydslPagingItemReader<Person> {
    val reader = QuerydslPagingItemReader(entityManagerFactory) { personRepository.findAllInBatch() }
    reader.pageSize = 10
    reader.pageOffset = false  // pageOffset 설정
    return reader
}
반응형