Study/message-queue

[kafka] @KafkaListener를 이용한 Consumer 구현

에디개발자 2021. 8. 26. 07:00
반응형

나를 닮았다고 한다...

Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다.

모든 소스는 Github에 올려두었습니다. 

 

목차

  1. Consumer
  2. KafkaListener Annotation 설정
  3. KafkaListener Annotation 사용방법
    1. Simple POJO Listeners
    2. 파티션 할당
    3. 특정 파티션 InitialOffset 설정
    4. 수동 확인 ( Manual Acknowledgment )
    5. Consumer Record Metadata
    6. Batch Listeners
    7. GroupId

1. Consumer

컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 @KafkaListener 를 이용하여 구현한 내용을 정리해보겠습니다.

 

 

2. KafkaListener Annotation 설정

개인적으로 MessageListener보단 @KafkaListener를 선호하는 편입니다. 이유는 @KafkaListener를 사용하면 Config와 명확히 분류되며 Config에서 Bean에 등록한 Factory 또한 사용할 수 있기 때문입니다. 

 

@KafkaListener를 사용하기 위한 필수 조건입니다. ( KafkaConsumerConfig에서 설정 )

  • @Configuration
  • @EnableKafka
  • kafkaListenerContainerFactory Bean 객체 
@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

        factory.setConcurrency(2) // Consumer Process Thread Count
        factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
        factory.containerProperties.pollTimeout = 500

        return factory
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class
        )
}
여러 Partition을 생성하여 테스트할 예정이라 ConcurrentKafkaListerContainer를 사용하겠습니다.

 

3. KafkaListener Annotation 사용방법

Kafka 클러스터는 Topic 1개, Partition 3개로 구성되어있습니다.

 

1. Simple POJO Listeners

<Sample1>

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topics = ["insert_member"], clientIdPrefix = "memClientId")
    fun listen(data: String) {
        println("data:: $data")
    }

}

// result 
// Send Message = [ test message ] 
//  - Offset = [ 37 ], Topic = [ insert_member ], Partition = [ 2 ]
  
// data:: test message

 

<Sample2>

@Component
class MemberTestConsumer {

    @KafkaListener(
        id = "mem", 
        topics = ["insert_member"], 
        autoStartup = "\${listen.auto.start:true}", 
        concurrency = "\${listen.concurrency:3}"
    )
    fun listen(data: String) {
        println("data:: $data")
    }
}

// result 
// Send Message = [ test message ] 
//  - Offset = [ 38 ], Topic = [ insert_member ], Partition = [ 0 ]
  
// data:: test message

concurrency 설정 기준에 대해서 궁금하시면 여기를 참조해주세요.

 

2. 파티션 할당

0번 파티션에 Recode만 Listen 합니다.

@Component
class MemberTestConsumer {

    @KafkaListener(
        id = "thing2",
        topicPartitions = [
            TopicPartition(topic = "insert_member", partitions = ["0"])
        ]
    )
    fun listen(data: String) {
        println("data:: $data")
    }
    
}


/**
 Send Message = [ test message ] 
   - Offset = [ 43 ], Topic = [ insert_member ], Partition = [ 1 ]
 Send Message = [ test message ] 
   - Offset = [ 38 ], Topic = [ insert_member ], Partition = [ 2 ]
 Send Message = [ test message ] 
   - Offset = [ 44 ], Topic = [ insert_member ], Partition = [ 1 ]
 // 0번 파티션  
 Send Message = [ test message ] 
   - Offset = [ 39 ], Topic = [ insert_member ], Partition = [ 0 ]
 data:: test message
 Send Message = [ test message ] 
   - Offset = [ 45 ], Topic = [ insert_member ], Partition = [ 1 ]
 Send Message = [ test message ] 
   - Offset = [ 39 ], Topic = [ insert_member ], Partition = [ 2 ]
 // 0번 파티션  
 Send Message = [ test message ] 
   - Offset = [ 40 ], Topic = [ insert_member ], Partition = [ 0 ]
 data:: test message
 */

 

3. 특정 파티션 InitialOffset 설정

@Component
class MemberTestConsumer {

    @KafkaListener(
        id = "mem",
        topicPartitions = [
            TopicPartition(
                topic = "insert_member",
                partitions = ["1"],
                partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "5000")]
            )
        ]
    )
    fun listen(data: String) {
        println("data:: $data")
    }

}

 

4. 수동 확인( Manual Acknowledgment )

AckMode를 Manaul로 설정한 경우입니다. 다시 말해서 카프카 Offset Commit을 수동으로 설정하고 Listener에서 데이터를 받고 처리를 완료한 후 로직에서 Commit 명령어를 호출하는 방식입니다.

 

KafkaConsumerConfig.kt

컨슈머 설정 클래스에서 ackMode를 Manual로 설정합니다.

factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

        factory.setConcurrency(2) // Consumer Process Thread Count
        factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
        factory.containerProperties.pollTimeout = 500
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL  // 추가

        return factory
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class
        )
}

 

@KafkaListener에서 Bean에 등록한 kafkaListenerContainerFactory를 containerFactory로 설정하면 Acknowledgment 객체를 파라미터로 받을 수 있습니다. 

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topics = ["insert_member"], containerFactory = "kafkaListenerContainerFactory")
    fun listen(data: String, ack: Acknowledgment) {
        println("data:: $data")
        ack.acknowledge()
    }

}

Listner에서 로직 수행이 완료되면 ack.acknowledge() 메서드를 호출하여 offset을 커밋하는 명령어를 날립니다. 

 

5. Consumer Record Metadata

컨슈머에서 Record의 메타데이터를 가져올 수 있습니다. 

  • @Payload, @Header를 이용한 방법
  • ConsumerRecordMetadata 객체를 이용한 방법

@Payload, @Header를 이용한 방법

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topicPattern = "insert_member")
    fun listen(
        @Payload data: String,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) key: Int?,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) partition: Int,
        @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) ts: Long
    ) {
        println("data:: $data key:: $key partition:: $partition topic:: $topic ts:: $ts")
    }
    
}


/**
 * result 
 * 1 CALL
 * Send Message = [ test message ] 
 *  - Offset = [ 118 ], Topic = [ insert_member ], Partition = [ 2 ]
 * data:: test message key:: null partition:: 2 topic:: insert_member ts:: 1629873469337
 *
 * 2 CALL
 * Send Message = [ test message ] 
 *  - Offset = [ 123 ], Topic = [ insert_member ], Partition = [ 1 ]
 * data:: test message key:: null partition:: 1 topic:: insert_member ts:: 1629873481904
 */

 

ConsumerRecordMetadata 객체를 이용한 방법

@Component
class MemberTestConsumer {    

    @KafkaListener(id = "mem", topicPattern = "insert_member")
    fun listen(data: String, meta: ConsumerRecordMetadata) {
        println("data:: $data partition:: ${meta.partition()} topic:: ${meta.topic()} ts:: ${meta.timestamp()}")
    }
    
}

 

6. Batch Listeners

Batch 설정을 적용한 Factory를 Bean에 등록합니다.

@Bean
fun batchFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

    factory.setConcurrency(3) // Consumer Process Thread Count
    factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
    factory.containerProperties.pollTimeout = 500
    factory.isBatchListener = true    // Batch 적용!!!!!!!!!

    return factory
}

 

BatchFactory를 이용한 KafkaListener를 사용한 Consumer

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topics = ["insert_member"], containerFactory = "batchFactory")
    fun listen(data: List<String>) {
        println("data:: $data")
    }

}


/**
 * Result
 * data:: [test message, test message, test message, test message, test message, test message]
 * data:: [test message, test message, test message, test message]
 * data:: [test message, test message]
 */

 

그 외의 Batch Listeners 활용법

@KafkaListener(id = "listenBatch1", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch1(data: List<String>) {
    println("batch1 data:: $data")
}

@KafkaListener(id = "listenBatch2", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch2(data: List<Message>?) {
    println("batch2 data:: $data")
}

@KafkaListener(id = "listenBatch3", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch3(data: List<Message>?, ack: Acknowledgment) {
    println("batch3 data:: $data")
}

@KafkaListener(id = "listenBatch4", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch4(data: List<Message>?, ack: Acknowledgment, consumer: Consumer<String, String>) {
    println("batch4 data:: $data")
}

@KafkaListener(id = "listenBatch5", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch5(data: List<ConsumerRecord<String, String>>) {
    data.forEach {
        println("batch5 data:: ${it.value()} topic=${it.topic()} partition=${it.partition()} offset=${it.offset()}")
    }

}

@KafkaListener(id = "listenBatch6", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch6(data: List<ConsumerRecord<String, String>>, ack: Acknowledgment) {
    data.forEach {
        println("batch6 data:: ${it.value()} topic=${it.topic()} partition=${it.partition()} offset=${it.offset()}")
    }
}

/**
 * Result
 * batch5 data:: test message topic=insert_member partition=2 offset=152
 * batch3 data:: [test message]
 * batch6 data:: test message topic=insert_member partition=2 offset=152
 * batch2 data:: [test message]
 * batch4 data:: [test message]
 * batch1 data:: [test message]
 */

 

GroupId

Consumer는 GroupId로 그룹화할 수 있습니다. 그룹화된 Consumer별로 데이터를 읽어옵니다. 반대로 같은 그룹에 2개 이상의 컨슈머가 있다면 1개의 컨슈머만 Read하고 나머지 컨슈머는 놀고 있는 상태가 됩니다.

 

코드로 살펴보겠습니다. 

2개의 컨슈머는 group1, 2개의 컨슈머는 group2로 설정
@KafkaListener(id = "listener1", topics = ["insert_member"], groupId = "group1")
fun listener1(data: String) {
    println("listener1 data:: $data")
}

@KafkaListener(id = "listener2", topics = ["insert_member"], groupId = "group1")
fun listener2(data: String) {
    println("listener2 data:: $data")
}

@KafkaListener(id = "listener3", topics = ["insert_member"], groupId = "group2")
fun listener3(data: String) {
    println("listener3 data:: $data")
}

@KafkaListener(id = "listener4", topics = ["insert_member"], groupId = "group2")
fun listener4(data: String) {
    println("listener4 data:: $data")
}

 

결과는 그룹별 하나의 컨슈머만 리스닝을 처리합니다.

// Send..
Send Message = [ test message ] 
  - Offset = [ 159 ], Topic = [ insert_member ], Partition = [ 0 ]
  
// Response  
listener3 data:: test message
listener2 data:: test message

 

Group별 Consumer 작동

반응형