Study/message-queue

[kafka] KafkaTemplate을 이용한 Producer 구현

에디개발자 2021. 8. 20. 07:00
반응형

나를 닮았다고 한다...

Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다.

모든 소스는 Github에 올려두었습니다. 

 

목차

  1. KafkaTemplate 구성
  2. KafkaTemplate 사용하여 Producer 구현

 


KafkaTemplate

원초적으로 Producer를 통해 Kafka에 Message를 Send하려면 KafkaProducer 인스턴스를 사용하여 send() 메서드를 호출해야합니다. KafkaTemplate은 KafkaProducer 감싸고 있는 인스턴스라고 생각하시면 이해하시기 편합니다. KafkaTemplate.send() 는 내부에서 결국 KafkaProducer 인스턴스의 send()를 호출하고 있습니다. 

 

1. KafkaTemplate 구성

KafkaProducerConfig.kt

DefaultKafkaProducerFactory 인스턴스를 사용하여 KafkaTemplate 를 생성할 수 있습니다. 

@Configuration
class KafkaProducerConfig {

    @Value("\${spring.kafka.producer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
        return KafkaTemplate(factory)
    }

    fun producerConfigs(): Map<String, Serializable> =
        mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
        )

}

Kafka의 데이터 저장방식은 직렬화를 통한 바이너리 배열 방식입니다. 그렇기 때문에 설정값에 Key, Value를 어떤 타입이 직렬화되는지 선언해야만 합니다. 

 

spring-kafka 2.3 Version 이후부터 병렬처리를 위한 속성값 추가

Transactions을 사용하지 않는 경우 기본적으로 DefaultKafkaProducerFactory가 모든 클라이언트에서 사용할 때 singleton producer를 생성합니다. 그러나 만약에 flush() 메서드를 Template에서 호출하면 같은 producer를 사용하는 다른 쓰레드에서는 지연이 발생할 수 있습니다. 

병렬 처리 시 1개의 Producer를 사용하기 때문에 지연 발생

 

보완

이런 케이스를 보완하고자 새로운 속성값인 producerPerThread를 사용할 수 있습니다. 각 쓰레드에서 별도의 생성자를 만들고 캐싱처리를 합니다.

쓰레드별 Producer 생성

 

producerPerThread = true 일 경우 생성자가 더이상 필요하지 않을 경우 코드에서 반드시 closeThreadBoundProducer()를 호출해야 합니다.

@Configuration
class KafkaProducerConfig {

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
        
        factory.isProducerPerThread = true
        
        // Must Call closeThreadBoundProducer()
        // factory.closeThreadBoundProducer()
        
        return KafkaTemplate(factory)
    }
}

 

Spring-Kafka 2.5 Version 이후 부터 Config 변경 가능

단편적인 예로 SSL 설정이 추가되거나 삭제되었을 경우 Config를 변경하기 용이해집니다.

@Configuration
class KafkaProducerConfig {

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
        
        // update configs
        val updateConfigs = mutableMapOf<String, Any>()
        producerFactory.updateConfigs(updateConfigs)

        // remove configs
        val removeConfigKey = "ConfigKey"
        producerFactory.removeConfig(removeConfigKey)
        
        return KafkaTemplate(factory)
    }

}

 

2. KafkaTemplate 사용하여 Producer 구현

카프카에 Message를 Send하는 클래스입니다.

@Component
class MemberProducer(
    val kafkaTemplate: KafkaTemplate<String, String>
) {
    companion object {
        const val TOPIC_NAME = "save_member"
    }

    /**
     * async send message
     * @param message String
     */
    fun sendMessage(message: String) {
        // kafka producer
        val listenableFuture = kafkaTemplate.send(TOPIC_NAME, message)

        // kafka add callback
        listenableFuture.addCallback(listenableFutureCallback(message))
    }
}

Bean에 등록된 KafkaTemplate이 사용됩니다. KafkaTemplate으로 send 하는 방법은 다양하게 있습니다. 

- Message 객체를 이용하여 Send
- ProducerRecord 객체를 이용하여 Send
- 토픽, 파티션을 설정 후 Send
- offset 설정 후 Send

KafkaTemplate에서 지원하는 Send 종류

 

카프카는 기본적으로 비동기 방식으로 동작합니다. kafkaTemplate의 반환값이 Future타입인 것으로 확인할 수 있습니다. 또한 Future 객체에 Callback Interface를 추가하여 처리할 수 있습니다. 

KafkaProducerListener

Before 2.5 Version

ListenableFutureCallback Interface를 구현하여 사용합니다.

fun listenableFutureCallback(message: String) =
    object: ListenableFutureCallback<SendResult<String, String>> {  
        override fun onSuccess(result: SendResult<String, String>?) {
            log.info(
                "Send Message = [ $message ] with offset=[ ${result!!.recordMetadata.offset()} ]"
            )
        }

        override fun onFailure(ex: Throwable) {
            log.error(
                "Message 전달 오류 [ $message ] due to: ${ex.message}", ex
            )
        }
    }

 

After 2.5 Version

KafkaSendCallback Interface를 구현하여 사용합니다. 실패 시 KafkaProducerException 발생하여 ProducerRecord 단위로 좀 더 자세한 Exception 정보를 알 수 있습니다. 

fun listenableFutureCallback(message: String) =
    object: KafkaSendCallback<String, String> {  
        override fun onSuccess(result: SendResult<String, String>?) {
            log.info(
                "Send Message = [ $message ] with offset=[ ${result!!.recordMetadata.offset()} ]"
            )
        }

        override fun onFailure(ex: KafkaProducerException) {
            log.error(
                "Message 전달 오류 [ $message ] due to: ${ex.getFailedProducerRecord<String, String>()}"
            )
        }
    }

 


KafkaTemplate 동기 방식 구현

카프카를 동기 방식으로 사용하고 싶은 경우 아래와 같이 코드를 작성할 수 있습니다. 

/**
 * sync send message
 * @param message String
 */
fun sendMessageSync(message: String) {
    // kafka producer
    val listenableFuture = kafkaTemplate.send(TOPIC_NAME, message)

    try {
        listenableFuture.get(10, TimeUnit.SECONDS)
        // success 처리
    } catch (e: ExecutionException) {
        // failure 처리
    } catch (e: TimeoutException) {
        // failure 처리
    } catch (e: InterruptedException) {
        // failure 처리
    }
}

Future.get() 호출하면 비동기 방식의 처리가 종료될 때까지 기다립니다. 


 

반응형