[kafka] KafkaTemplate을 이용한 Producer 구현
Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다.
모든 소스는 Github에 올려두었습니다.
목차
- KafkaTemplate 구성
- KafkaTemplate 사용하여 Producer 구현
KafkaTemplate
원초적으로 Producer를 통해 Kafka에 Message를 Send하려면 KafkaProducer 인스턴스를 사용하여 send() 메서드를 호출해야합니다. KafkaTemplate은 KafkaProducer 감싸고 있는 인스턴스라고 생각하시면 이해하시기 편합니다. KafkaTemplate.send() 는 내부에서 결국 KafkaProducer 인스턴스의 send()를 호출하고 있습니다.
1. KafkaTemplate 구성
KafkaProducerConfig.kt
DefaultKafkaProducerFactory 인스턴스를 사용하여 KafkaTemplate 를 생성할 수 있습니다.
@Configuration
class KafkaProducerConfig {
@Value("\${spring.kafka.producer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVER: String
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
return KafkaTemplate(factory)
}
fun producerConfigs(): Map<String, Serializable> =
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
}
Kafka의 데이터 저장방식은 직렬화를 통한 바이너리 배열 방식입니다. 그렇기 때문에 설정값에 Key, Value를 어떤 타입이 직렬화되는지 선언해야만 합니다.
spring-kafka 2.3 Version 이후부터 병렬처리를 위한 속성값 추가
Transactions을 사용하지 않는 경우 기본적으로 DefaultKafkaProducerFactory가 모든 클라이언트에서 사용할 때 singleton producer를 생성합니다. 그러나 만약에 flush() 메서드를 Template에서 호출하면 같은 producer를 사용하는 다른 쓰레드에서는 지연이 발생할 수 있습니다.
보완
이런 케이스를 보완하고자 새로운 속성값인 producerPerThread를 사용할 수 있습니다. 각 쓰레드에서 별도의 생성자를 만들고 캐싱처리를 합니다.
producerPerThread = true 일 경우 생성자가 더이상 필요하지 않을 경우 코드에서 반드시 closeThreadBoundProducer()를 호출해야 합니다.
@Configuration
class KafkaProducerConfig {
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
factory.isProducerPerThread = true
// Must Call closeThreadBoundProducer()
// factory.closeThreadBoundProducer()
return KafkaTemplate(factory)
}
}
Spring-Kafka 2.5 Version 이후 부터 Config 변경 가능
단편적인 예로 SSL 설정이 추가되거나 삭제되었을 경우 Config를 변경하기 용이해집니다.
@Configuration
class KafkaProducerConfig {
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
// update configs
val updateConfigs = mutableMapOf<String, Any>()
producerFactory.updateConfigs(updateConfigs)
// remove configs
val removeConfigKey = "ConfigKey"
producerFactory.removeConfig(removeConfigKey)
return KafkaTemplate(factory)
}
}
2. KafkaTemplate 사용하여 Producer 구현
카프카에 Message를 Send하는 클래스입니다.
@Component
class MemberProducer(
val kafkaTemplate: KafkaTemplate<String, String>
) {
companion object {
const val TOPIC_NAME = "save_member"
}
/**
* async send message
* @param message String
*/
fun sendMessage(message: String) {
// kafka producer
val listenableFuture = kafkaTemplate.send(TOPIC_NAME, message)
// kafka add callback
listenableFuture.addCallback(listenableFutureCallback(message))
}
}
Bean에 등록된 KafkaTemplate이 사용됩니다. KafkaTemplate으로 send 하는 방법은 다양하게 있습니다.
- Message 객체를 이용하여 Send
- ProducerRecord 객체를 이용하여 Send
- 토픽, 파티션을 설정 후 Send
- offset 설정 후 Send
카프카는 기본적으로 비동기 방식으로 동작합니다. kafkaTemplate의 반환값이 Future타입인 것으로 확인할 수 있습니다. 또한 Future 객체에 Callback Interface를 추가하여 처리할 수 있습니다.
KafkaProducerListener
Before 2.5 Version
ListenableFutureCallback Interface를 구현하여 사용합니다.
fun listenableFutureCallback(message: String) =
object: ListenableFutureCallback<SendResult<String, String>> {
override fun onSuccess(result: SendResult<String, String>?) {
log.info(
"Send Message = [ $message ] with offset=[ ${result!!.recordMetadata.offset()} ]"
)
}
override fun onFailure(ex: Throwable) {
log.error(
"Message 전달 오류 [ $message ] due to: ${ex.message}", ex
)
}
}
After 2.5 Version
KafkaSendCallback Interface를 구현하여 사용합니다. 실패 시 KafkaProducerException 발생하여 ProducerRecord 단위로 좀 더 자세한 Exception 정보를 알 수 있습니다.
fun listenableFutureCallback(message: String) =
object: KafkaSendCallback<String, String> {
override fun onSuccess(result: SendResult<String, String>?) {
log.info(
"Send Message = [ $message ] with offset=[ ${result!!.recordMetadata.offset()} ]"
)
}
override fun onFailure(ex: KafkaProducerException) {
log.error(
"Message 전달 오류 [ $message ] due to: ${ex.getFailedProducerRecord<String, String>()}"
)
}
}
KafkaTemplate 동기 방식 구현
카프카를 동기 방식으로 사용하고 싶은 경우 아래와 같이 코드를 작성할 수 있습니다.
/**
* sync send message
* @param message String
*/
fun sendMessageSync(message: String) {
// kafka producer
val listenableFuture = kafkaTemplate.send(TOPIC_NAME, message)
try {
listenableFuture.get(10, TimeUnit.SECONDS)
// success 처리
} catch (e: ExecutionException) {
// failure 처리
} catch (e: TimeoutException) {
// failure 처리
} catch (e: InterruptedException) {
// failure 처리
}
}
Future.get() 호출하면 비동기 방식의 처리가 종료될 때까지 기다립니다.