Develop/spring-batch

[Kotlin] Spring-Batch 적용

에디개발자 2021. 3. 25. 07:00
반응형

이 글은 Kotlin으로 Spring Batch를 구현한 내용을 정리한 내용입니다. 코드의 양이 많아 중요한 부분 위주로만 작성하였습니다.

 

모든 코드는 Github에 있습니다. 참조해주세요.

나를 닮았다고 한다...

글을 작성하기에 앞서 작업한 내용은 지난번 작성한 글을 기준으로 작성하였습니다. 함께보시면 이해하시기 편할 것 입니다. 이 글에서는 Spring-Batch가 무엇인지에 대해서는 다루지 않겠습니다. Spring-Batch를 Java, Querydsl-JPA로 구현이 가능해야 이해하는 데 무리가 없을 것입니다.

 

build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    val kotlinVersion = "1.4.31"

    id("org.springframework.boot") version "2.4.4"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version kotlinVersion
    kotlin("plugin.spring") version kotlinVersion
    kotlin("kapt") version kotlinVersion
    kotlin("plugin.allopen") version kotlinVersion
    kotlin("plugin.jpa") version kotlinVersion
}

group = "me.practice"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

configurations {
    compileOnly {
        extendsFrom(configurations.annotationProcessor.get())
    }
}

repositories {
    mavenCentral()
}

dependencies {
    val p6spyVersion = "1.6.2"

    implementation("org.springframework.boot:spring-boot-starter-batch")

    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

    developmentOnly("org.springframework.boot:spring-boot-devtools")
    annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
    implementation("com.h2database:h2")

    implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")

    implementation("com.querydsl:querydsl-jpa") // querydsl
    implementation("com.github.gavlyukovskiy:p6spy-spring-boot-starter:${p6spyVersion}")
    kapt("com.querydsl:querydsl-apt")

    annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
    annotationProcessor(group = "com.querydsl", name = "querydsl-apt", classifier = "jpa")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.batch:spring-batch-test")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

sourceSets["main"].withConvention(org.jetbrains.kotlin.gradle.plugin.KotlinSourceSet::class) {
    kotlin.srcDir("$buildDir/generated/source/kapt/main")
}

  • kapt 플러그인 : Java Compiler가 Annotation Processing을 실행하는 과정에서 Kotlin 코드를 인지할 수 없습니다. 그래서 QClass를 생성할 수 없습니다. 그래서 Kotlin에서 Annotation Process를 지원하는 Kapt 플러그인을 사용합니다.
  • allopen 플러그인 : kotlin은 기본 클래스가 final입니다. JPA에서 Lazy 로딩 시 Entity를 상속받아 처리하는 proxy가 불가능합니다. 이 문제를 해결하기 위해선 kotlin의 Entity 클래스가 open class이여야합니다. 이 플러그인은 모든 Entity 클래스를 open합니다.
  • jpa 플러그인 : Hibernate에서 리플렉션으로 Entity객체를 생성하기 때문에 반드시 no-arg 생성자를 가지고 있어야합니다. 이 플러그인은 @Entity, @Embeddable, @MappedSuperClass가 붙은 클래스에 NoArgumentContructor를 생성해줍니다.
  • sourceSets : "$builDir/generated/source/kapt/main 경로를 설정하고 아래의 명령어를 실행하면 Entity의 QClass가 생성됩니다.
./gradlew clean compileKotlin

QuerydslPagingItemReader.kt

Querydsl을 ItemReader로 사용하기 위한 코틀린 클래스입니다. 제가 지난번 작성한 Java와 동일한 기능을 하는 코틀린 클래스입니다.

open class QuerydslPagingItemReader<T>(  // 상속을 위해 open
    private val entityManagerFactory: EntityManagerFactory,
    pageSize: Int,
    val queryCreator: (JPAQueryFactory) -> JPAQuery<T>    // Function에서 lambda로 변경
) : AbstractPagingItemReader<T>() {

    private lateinit var entityManager: EntityManager
    private val jpaPropertyMap = mutableMapOf<String, Any>()
    var trasacted = true

    init {
        setPageSize(pageSize)
    }

    fun setTransacted(transacted: Boolean) {
        this.trasacted = trasacted
    }

    override fun doOpen() {
        super.doOpen()

        entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap)
    }

    override fun doReadPage() {
        clearIfTransacted()

        val query = createQuery()
            .offset((page * pageSize).toLong())
            .limit(pageSize.toLong())

        initResults()
        fetchQuery(query)
    }

    private fun clearIfTransacted() {
        if (trasacted) {
            entityManager.clear()
        }
    }

    private fun createQuery() =
        queryCreator(JPAQueryFactory(entityManager))


    private fun initResults() {
        if (CollectionUtils.isEmpty(results)) {
            results = CopyOnWriteArrayList()
        } else {
            results.clear()
        }
    }

    private fun fetchQuery(query: JPAQuery<T>) {
        if (!trasacted) {
            val queryResult = query.fetch()
            for (entity in queryResult) {
                entityManager.detach(entity)
                results + entity
            }
        } else {
            results.addAll(query.fetch())
        }
    }

    override fun doJumpToPage(itemIndex: Int) {

    }

    override fun doClose() {
        entityManager.close()
        super.doClose()
    }
}
  • 기본적으로 Batch에서 Reader를 구현할 때는 상속을 받아 사용하기 때문에 open class로 선언하였습니다.
  • 파라미터 중 queryCreator는 Java에서는 function type이었으나 lambda로 변경했습니다.

SimpleJobConfiguration.kt

@Configuration
@EnableBatchProcessing
@EnableScheduling
class SimpleJobConfiguration(
    @Autowired val jobBuilderFactory: JobBuilderFactory,
    @Autowired val stepBuilderFactory: StepBuilderFactory,
    @Autowired val entityManagerFactory: EntityManagerFactory,
    @Autowired val personRepository: PersonRepository,
    @Autowired val simpleStepListener: SimpleStepListener,
    @Autowired val simpleProcessor: SimpleProcessor,
    @Autowired val simpleWriter: SimpleWriter
) {
    private val chunkSize = 10
    private val pageSize = 10

    @Bean
    fun simpleJob() =
        jobBuilderFactory.get(BatchItem.SIMPLE_JOB.jobName)
            .start(simpleStep())
            .build()

    @Bean
    fun simpleStep() =
        stepBuilderFactory.get(BatchItem.SIMPLE_STEP.jobName)
            .chunk<Person, Person>(chunkSize)
            .reader(reader())
            .processor(simpleProcessor)
            .writer(simpleWriter)
            .listener(simpleStepListener)
            .build()

    fun reader(): QuerydslPagingItemReader<Person> =
        QuerydslPagingItemReader(entityManagerFactory, pageSize) { personRepository.findAllInBatch() }
}

크게 변동 된 사항은 없습니다. reader 메소드에서 파라미터를 function type에서 lambda로 수정했습니다.

 

SimpleProcessor.kt

@Configuration
class SimpleProcessor: ItemProcessor<Person, Person> {
    override fun process(person: Person): Person? {
        println("simple batch process start.")

        // logic...
        if (person.id == 2 || person.id == 3)
            return null

        // test case - RollBack
//        if (person.id == 4) {
//            throw RuntimeException("increase skip count")
//        }

        println("simple batch process finished.")

        return person
    }
}

 

SimpleWriter.kt

@Configuration
class SimpleWriter: ItemWriter<Person> {
    val log = LoggerFactory.getLogger(SimpleWriter::class.java)

    override fun write(people: MutableList<out Person>) {
        log.info("simple batch writer start.")

        for (person in people) {
            println(person.toString())
        }

        log.info("simple batch writer finished.")
    }
}

 

SimpleStepListener.kt

@Component
class SimpleStepListener: StepExecutionListenerSupport() {

    private val log = LoggerFactory.getLogger(SimpleStepListener::class.java)

    override fun beforeStep(stepExecution: StepExecution) {
        val name = stepExecution.stepName
        log.info("before beforeStep. name: $name")

        super.beforeStep(stepExecution)
    }

    override fun afterStep(stepExecution: StepExecution): ExitStatus? {
        log.info(
                "\n    ⊙ after simpleStep\n" +
                        "    ├─ Name: ${stepExecution.stepName}\n" +
                        "    ├─ Read Count: ${stepExecution.readCount} \n" +
                        "    ├─ Write Count: ${stepExecution.writeCount}\n" +
                        "    ├─ Commit Count: ${stepExecution.commitCount}\n" +
                        "    ├─ Rollback Count: ${stepExecution.rollbackCount}\n" +
                        "    ├─ status: ${stepExecution.status}"
        )

        return super.afterStep(stepExecution)
    }
}

여기서 상속을 StepExecutionListenerSupport()를 받았습니다. StepExecutionListener와 차이점은 메서드 오버라이드가 필수사항이 아닙니다. beforeStep, afterStep 중 필요한 메소드만 오버라이드하여 사용할 수 있습니다.

 

KotlinBatchApplication.kt

@SpringBootApplication
class KotlinBatchApplication

fun main(args: Array<String>) {
    val applicationContext = runApplication<KotlinBatchApplication>(*args)

    // kill process when finished batch
    exitProcess(SpringApplication.exit(applicationContext))
}

 

 

그 외 설정하면 좋은 것

P6Spy

sql 로그를 pretty하게 보이도록 도와줍니다. 제가 작성한 어랴 글을 기준으로 동일한 기능을 하는 코드입니다.

@Configuration
class P6spyLogMessageFormatConfiguration {
    companion object {
        private const val BATCH_TABLE_NAME_PREFIX = "BATCH"
    }

    @PostConstruct
    fun setLogMessageFormat() {
        // apply pretty format
        P6SpyOptions.getActiveInstance().logMessageFormat = P6spySqlFormatConfiguration::class.java.name

        // apply batch exclude filter
        val activeInstance = P6LogOptions.getActiveInstance()
        activeInstance.filter = true
        activeInstance.exclude = BATCH_TABLE_NAME_PREFIX
    }
}
class P6spySqlFormatConfiguration : MessageFormattingStrategy {
    companion object {
        private const val SQL_CREATE = "create"
        private const val SQL_ALTER = "alter"
        private const val SQL_COMMENT = "comment"
    }

    // apply sql format
    override fun formatMessage(
        connectionId: Int,
        now: String?,
        elapsed: Long,
        category: String?,
        prepared: String?,
        sql: String?,
        url: String?
    ): String  =
        now + "|" + elapsed + "ms|" + category + "|connection " + connectionId + "|" + P6Util.singleLine(prepared) + formatSql(category, sql)

    private fun formatSql(category: String?, sql: String?): String? {
        var resultSql = "";
        if (sql == null || sql.trim() == "")
            return resultSql

        // Only format Statement, distinguish DDL And DML
        if (Category.STATEMENT.name == category) {
            val tempSql = sql.trim().toLowerCase(Locale.ROOT)
            resultSql =
                if (tempSql.startsWith(SQL_CREATE) || tempSql.startsWith(SQL_ALTER) || tempSql.startsWith(SQL_COMMENT))
                    FormatStyle.DDL.formatter.format(sql)
                else
                    FormatStyle.BASIC.formatter.format(sql)
            resultSql = "|\nHeFormatSql(P6Spy sql,Hibernate format):$resultSql"
        }
        return resultSql
    }
}

 

실행하기

다 작성되었다면 애플리케이션을 실행합니다. 전 테스트를 위하여 17 row의 데이터를 밀어넣었습니다.

HeFormatSql(P6Spy sql,Hibernate format):
    select
        person0_.id as id1_0_,
        person0_.address as address2_0_,
        person0_.name as name3_0_ 
    from
        person person0_ limit 10
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process start.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
2021-03-22 23:19:50.212  INFO 1160 --- [  restartedMain] m.p.k.i.simple.step.write.SimpleWriter   : simple batch writer start.
id: 1 name: 테스트1 address: 주소17
id: 4 name: 테스트14 address: 주소14
id: 5 name: 테스트15 address: 주소13
id: 6 name: 테스트16 address: 주소12
id: 7 name: 테스트17 address: 주소11
id: 8 name: 테스트18 address: 9주소1
id: 9 name: 테스트19 address: 8주소1
id: 10 name: 테스트111 address: 7주소1
2021-03-22 23:19:50.212  INFO 1160 --- [  restartedMain] m.p.k.i.simple.step.write.SimpleWriter   : simple batch writer finished.
2021-03-22 23:19:50.215  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790215|0ms|commit|connection 21|
2021-03-22 23:19:50.224  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790224|0ms|statement|connection 22|select person0_.id as id1_0_, person0_.address as address2_0_, person0_.name as name3_0_ from person person0_ limit ? offset ?|
HeFormatSql(P6Spy sql,Hibernate format):
    select
        person0_.id as id1_0_,
        person0_.address as address2_0_,
        person0_.name as name3_0_ 
    from
        person person0_ limit 10 offset 10
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
simple batch process start.
simple batch process finished.
2021-03-22 23:19:50.226  INFO 1160 --- [  restartedMain] m.p.k.i.simple.step.write.SimpleWriter   : simple batch writer start.
id: 11 name: 테스트21 address: 6주소1
id: 12 name: 테스트31 address: 5주소1
id: 13 name: 테스트41 address: 4주소1
id: 14 name: 테스트51 address: 3주소1
id: 15 name: 테스트61 address: 2주소1
id: 16 name: 테스트71 address: 2주소1
id: 17 name: 테스트81 address: 1주소1
2021-03-22 23:19:50.226  INFO 1160 --- [  restartedMain] m.p.k.i.simple.step.write.SimpleWriter   : simple batch writer finished.
2021-03-22 23:19:50.227  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790227|0ms|commit|connection 22|
2021-03-22 23:19:50.228  INFO 1160 --- [  restartedMain] m.p.k.i.s.listener.SimpleStepListener    : 
    ⊙ after simpleStep
    ├─ Name: simpleStep
    ├─ Read Count: 17 
    ├─ Write Count: 15
    ├─ Commit Count: 2
    ├─ Rollback Count: 0
    ├─ status: COMPLETED
2021-03-22 23:19:50.228  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790228|0ms|commit|connection 23|
2021-03-22 23:19:50.229  INFO 1160 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [simpleStep] executed in 153ms
2021-03-22 23:19:50.230  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790230|0ms|commit|connection 24|
2021-03-22 23:19:50.231  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790231|0ms|commit|connection 25|
2021-03-22 23:19:50.233  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790233|0ms|commit|connection 26|
2021-03-22 23:19:50.233  INFO 1160 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=simpleJob]] completed with the following parameters: [{version=1}] and the following status: [COMPLETED] in 168ms
2021-03-22 23:19:50.236  INFO 1160 --- [  restartedMain] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2021-03-22 23:19:50.251  INFO 1160 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2021-03-22 23:19:50.252  INFO 1160 --- [  restartedMain] o.s.s.quartz.SchedulerFactoryBean        : Shutting down Quartz Scheduler
2021-03-22 23:19:50.252  INFO 1160 --- [  restartedMain] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED shutting down.
2021-03-22 23:19:50.252  INFO 1160 --- [  restartedMain] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2021-03-22 23:19:50.252  INFO 1160 --- [  restartedMain] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED shutdown complete.
2021-03-22 23:19:50.253  INFO 1160 --- [  restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2021-03-22 23:19:50.254  INFO 1160 --- [  restartedMain] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-03-22 23:19:50.255  INFO 1160 --- [  restartedMain] .SchemaDropperImpl$DelayedDropActionImpl : HHH000477: Starting delayed evictData of schema as part of SessionFactory shut-down'
2021-03-22 23:19:50.259  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790259|3ms|statement|connection 27|drop table if exists person CASCADE |
HeFormatSql(P6Spy sql,Hibernate format):
    drop table if exists person CASCADE 
2021-03-22 23:19:50.260  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790260|0ms|statement|connection 27|drop sequence if exists hibernate_sequence|
HeFormatSql(P6Spy sql,Hibernate format):
    drop sequence if exists hibernate_sequence
2021-03-22 23:19:50.465  INFO 1160 --- [  restartedMain] p6spy                                    : 1616422790465|202ms|statement|connection 28|SHUTDOWN|
HeFormatSql(P6Spy sql,Hibernate format):
    SHUTDOWN
2021-03-22 23:19:50.466  WARN 1160 --- [  restartedMain] o.s.b.f.support.DisposableBeanAdapter    : Invocation of destroy method failed on bean with name 'inMemoryDatabaseShutdownExecutor': org.h2.jdbc.JdbcSQLNonTransientConnectionException: Database is already closed (to disable automatic closing at VM shutdown, add ";DB_CLOSE_ON_EXIT=FALSE" to the db URL) [90121-200]
2021-03-22 23:19:50.467  INFO 1160 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-03-22 23:19:50.468  INFO 1160 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0
  • pageSize, ChunkSize가 10개씩 지정되어 10개를 조회하고 10개씩 처리하는 것을 로그를 통해 확인할 수 있습니다.
  • SimpleProcessor에서 2개의 데이터를 누락시켜 ReadCount: 17, WriteCount: 15를 확인할 수 있습니다.
  • 모든 Row을 처리하고 애플리케이션이 정상 종료된 것을 확인할 수 있습니다.

 

결론

Kotlin의 매력을 엿볼 수 있는 좋은 기회였습니다. Java로 작성이 가능하다면 Kotlin으로 쉽게 변환할 수 있고 코드양도 현저히 줄어든 것을 확인할 수 있습니다. 로직이 많아지고 복잡해질 수록 Kotlin 언어의 힘은 점점 커질 것 입니다.

반응형