[kafka] @KafkaListener를 이용한 Consumer 구현
Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다.
모든 소스는 Github에 올려두었습니다.
목차
- Consumer
- KafkaListener Annotation 설정
- KafkaListener Annotation 사용방법
- Simple POJO Listeners
- 파티션 할당
- 특정 파티션 InitialOffset 설정
- 수동 확인 ( Manual Acknowledgment )
- Consumer Record Metadata
- Batch Listeners
- GroupId
1. Consumer
컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 @KafkaListener 를 이용하여 구현한 내용을 정리해보겠습니다.
2. KafkaListener Annotation 설정
개인적으로 MessageListener보단 @KafkaListener를 선호하는 편입니다. 이유는 @KafkaListener를 사용하면 Config와 명확히 분류되며 Config에서 Bean에 등록한 Factory 또한 사용할 수 있기 때문입니다.
@KafkaListener를 사용하기 위한 필수 조건입니다. ( KafkaConsumerConfig에서 설정 )
- @Configuration
- @EnableKafka
- kafkaListenerContainerFactory Bean 객체
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVER: String
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.setConcurrency(2) // Consumer Process Thread Count
factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
factory.containerProperties.pollTimeout = 500
return factory
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // 마지막 읽은 부분부터 Read
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class
)
}
여러 Partition을 생성하여 테스트할 예정이라 ConcurrentKafkaListerContainer를 사용하겠습니다.
3. KafkaListener Annotation 사용방법
Kafka 클러스터는 Topic 1개, Partition 3개로 구성되어있습니다.
1. Simple POJO Listeners
<Sample1>
@Component
class MemberTestConsumer {
@KafkaListener(id = "mem", topics = ["insert_member"], clientIdPrefix = "memClientId")
fun listen(data: String) {
println("data:: $data")
}
}
// result
// Send Message = [ test message ]
// - Offset = [ 37 ], Topic = [ insert_member ], Partition = [ 2 ]
// data:: test message
<Sample2>
@Component
class MemberTestConsumer {
@KafkaListener(
id = "mem",
topics = ["insert_member"],
autoStartup = "\${listen.auto.start:true}",
concurrency = "\${listen.concurrency:3}"
)
fun listen(data: String) {
println("data:: $data")
}
}
// result
// Send Message = [ test message ]
// - Offset = [ 38 ], Topic = [ insert_member ], Partition = [ 0 ]
// data:: test message
concurrency 설정 기준에 대해서 궁금하시면 여기를 참조해주세요.
2. 파티션 할당
0번 파티션에 Recode만 Listen 합니다.
@Component
class MemberTestConsumer {
@KafkaListener(
id = "thing2",
topicPartitions = [
TopicPartition(topic = "insert_member", partitions = ["0"])
]
)
fun listen(data: String) {
println("data:: $data")
}
}
/**
Send Message = [ test message ]
- Offset = [ 43 ], Topic = [ insert_member ], Partition = [ 1 ]
Send Message = [ test message ]
- Offset = [ 38 ], Topic = [ insert_member ], Partition = [ 2 ]
Send Message = [ test message ]
- Offset = [ 44 ], Topic = [ insert_member ], Partition = [ 1 ]
// 0번 파티션
Send Message = [ test message ]
- Offset = [ 39 ], Topic = [ insert_member ], Partition = [ 0 ]
data:: test message
Send Message = [ test message ]
- Offset = [ 45 ], Topic = [ insert_member ], Partition = [ 1 ]
Send Message = [ test message ]
- Offset = [ 39 ], Topic = [ insert_member ], Partition = [ 2 ]
// 0번 파티션
Send Message = [ test message ]
- Offset = [ 40 ], Topic = [ insert_member ], Partition = [ 0 ]
data:: test message
*/
3. 특정 파티션 InitialOffset 설정
@Component
class MemberTestConsumer {
@KafkaListener(
id = "mem",
topicPartitions = [
TopicPartition(
topic = "insert_member",
partitions = ["1"],
partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "5000")]
)
]
)
fun listen(data: String) {
println("data:: $data")
}
}
4. 수동 확인( Manual Acknowledgment )
AckMode를 Manaul로 설정한 경우입니다. 다시 말해서 카프카 Offset Commit을 수동으로 설정하고 Listener에서 데이터를 받고 처리를 완료한 후 로직에서 Commit 명령어를 호출하는 방식입니다.
KafkaConsumerConfig.kt
컨슈머 설정 클래스에서 ackMode를 Manual로 설정합니다.
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVER: String
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.setConcurrency(2) // Consumer Process Thread Count
factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
factory.containerProperties.pollTimeout = 500
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL // 추가
return factory
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // 마지막 읽은 부분부터 Read
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class
)
}
@KafkaListener에서 Bean에 등록한 kafkaListenerContainerFactory를 containerFactory로 설정하면 Acknowledgment 객체를 파라미터로 받을 수 있습니다.
@Component
class MemberTestConsumer {
@KafkaListener(id = "mem", topics = ["insert_member"], containerFactory = "kafkaListenerContainerFactory")
fun listen(data: String, ack: Acknowledgment) {
println("data:: $data")
ack.acknowledge()
}
}
Listner에서 로직 수행이 완료되면 ack.acknowledge() 메서드를 호출하여 offset을 커밋하는 명령어를 날립니다.
5. Consumer Record Metadata
컨슈머에서 Record의 메타데이터를 가져올 수 있습니다.
- @Payload, @Header를 이용한 방법
- ConsumerRecordMetadata 객체를 이용한 방법
@Payload, @Header를 이용한 방법
@Component
class MemberTestConsumer {
@KafkaListener(id = "mem", topicPattern = "insert_member")
fun listen(
@Payload data: String,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) key: Int?,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) partition: Int,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) ts: Long
) {
println("data:: $data key:: $key partition:: $partition topic:: $topic ts:: $ts")
}
}
/**
* result
* 1 CALL
* Send Message = [ test message ]
* - Offset = [ 118 ], Topic = [ insert_member ], Partition = [ 2 ]
* data:: test message key:: null partition:: 2 topic:: insert_member ts:: 1629873469337
*
* 2 CALL
* Send Message = [ test message ]
* - Offset = [ 123 ], Topic = [ insert_member ], Partition = [ 1 ]
* data:: test message key:: null partition:: 1 topic:: insert_member ts:: 1629873481904
*/
ConsumerRecordMetadata 객체를 이용한 방법
@Component
class MemberTestConsumer {
@KafkaListener(id = "mem", topicPattern = "insert_member")
fun listen(data: String, meta: ConsumerRecordMetadata) {
println("data:: $data partition:: ${meta.partition()} topic:: ${meta.topic()} ts:: ${meta.timestamp()}")
}
}
6. Batch Listeners
Batch 설정을 적용한 Factory를 Bean에 등록합니다.
@Bean
fun batchFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.setConcurrency(3) // Consumer Process Thread Count
factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
factory.containerProperties.pollTimeout = 500
factory.isBatchListener = true // Batch 적용!!!!!!!!!
return factory
}
BatchFactory를 이용한 KafkaListener를 사용한 Consumer
@Component
class MemberTestConsumer {
@KafkaListener(id = "mem", topics = ["insert_member"], containerFactory = "batchFactory")
fun listen(data: List<String>) {
println("data:: $data")
}
}
/**
* Result
* data:: [test message, test message, test message, test message, test message, test message]
* data:: [test message, test message, test message, test message]
* data:: [test message, test message]
*/
그 외의 Batch Listeners 활용법
@KafkaListener(id = "listenBatch1", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch1(data: List<String>) {
println("batch1 data:: $data")
}
@KafkaListener(id = "listenBatch2", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch2(data: List<Message>?) {
println("batch2 data:: $data")
}
@KafkaListener(id = "listenBatch3", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch3(data: List<Message>?, ack: Acknowledgment) {
println("batch3 data:: $data")
}
@KafkaListener(id = "listenBatch4", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch4(data: List<Message>?, ack: Acknowledgment, consumer: Consumer<String, String>) {
println("batch4 data:: $data")
}
@KafkaListener(id = "listenBatch5", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch5(data: List<ConsumerRecord<String, String>>) {
data.forEach {
println("batch5 data:: ${it.value()} topic=${it.topic()} partition=${it.partition()} offset=${it.offset()}")
}
}
@KafkaListener(id = "listenBatch6", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch6(data: List<ConsumerRecord<String, String>>, ack: Acknowledgment) {
data.forEach {
println("batch6 data:: ${it.value()} topic=${it.topic()} partition=${it.partition()} offset=${it.offset()}")
}
}
/**
* Result
* batch5 data:: test message topic=insert_member partition=2 offset=152
* batch3 data:: [test message]
* batch6 data:: test message topic=insert_member partition=2 offset=152
* batch2 data:: [test message]
* batch4 data:: [test message]
* batch1 data:: [test message]
*/
GroupId
Consumer는 GroupId로 그룹화할 수 있습니다. 그룹화된 Consumer별로 데이터를 읽어옵니다. 반대로 같은 그룹에 2개 이상의 컨슈머가 있다면 1개의 컨슈머만 Read하고 나머지 컨슈머는 놀고 있는 상태가 됩니다.
코드로 살펴보겠습니다.
2개의 컨슈머는 group1, 2개의 컨슈머는 group2로 설정
@KafkaListener(id = "listener1", topics = ["insert_member"], groupId = "group1")
fun listener1(data: String) {
println("listener1 data:: $data")
}
@KafkaListener(id = "listener2", topics = ["insert_member"], groupId = "group1")
fun listener2(data: String) {
println("listener2 data:: $data")
}
@KafkaListener(id = "listener3", topics = ["insert_member"], groupId = "group2")
fun listener3(data: String) {
println("listener3 data:: $data")
}
@KafkaListener(id = "listener4", topics = ["insert_member"], groupId = "group2")
fun listener4(data: String) {
println("listener4 data:: $data")
}
결과는 그룹별 하나의 컨슈머만 리스닝을 처리합니다.
// Send..
Send Message = [ test message ]
- Offset = [ 159 ], Topic = [ insert_member ], Partition = [ 0 ]
// Response
listener3 data:: test message
listener2 data:: test message