Study/message-queue

[kafka] Message Listeners를 이용한 Consumer 구현

에디개발자 2021. 8. 23. 06:30
반응형

나를 닮았다고 한다...

Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다.

모든 소스는 Github에 올려두었습니다. 

 

목차

  1. Consumer
  2. Message Listeners 이용한 구현
    1. Listener 종류
    2. MessageListenerContainers 종류
    3. Committing Offset
    4. nack

1. Consumer

컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 Message Listeners 를 이용하여 구현한 내용을 정리해보겠습니다.

 

2. Message Listeners 이용한 구현

Message Listener Container를 사용할 때 Receive Data를 listener에서 제공해야만 합니다. 

 

2-1. Listener 종류

Listener When ConsumerRecord 처리 대상 비고
MessageListener - 자동 커밋 
- Container-Managed 중 하나
개별  
AcknowledgingMessageListener - 수동 커밋 중 하나 개별  
ConsumerAwareMessageListener - 자동 커밋 
- Container-Managed 중 하나
개별 Consumer 엑세스 가능
AcknowledgingConsumerAwareMessageListener - 수동 커밋 중 하나 개별 Consumer 엑세스 가능
BatchMessageListener - 자동 커밋 
- Container-Managed 중 하나
ALL AckMode.RECODE 지원 X
BatchAcknowledgingMessageListener - 수동 커밋 중 하나 ALL  
BatchConsumerAwareMessageListener - 자동 커밋 
- Container-Managed 중 하나
ALL Consumer 엑세스 가능
AckMode.RECODE 지원 X
BatchAcknowledgingConsumerAwareMessageListener - 수동 커밋 중 하나 ALL Consumer 엑세스 가능
 Consumer는 Thread-Safe 하지 않습니다. Listener를 호출하는 쓰레드에서만 호출해야합니다.

 

MessageListener

Partition에 Record를 1개씩 읽어와 처리합니다.

MessageListener 및 MessageListener를 상속받는 Listener 클래스의 종류입니다.

MessageListener<String, String>() { data ->
    // logic..
}

AcknowledgingMessageListener<String, String> { data, acknowledgment ->
    // logic..                
}

ConsumerAwareMessageListener<String, String> { data, consumer ->
    // logic..
}

AcknowledgingConsumerAwareMessageListener<String, String> {data, acknowledgment, consumer ->
    // logic..
}

 

BatchMessageListener

Partition에 모든 데이터를 읽어와 처리합니다.

BatchMessageListener 및 BatchMessgeListener를 상속받는 Listener 클래스 종류

BatchMessageListener<String, String> {data ->
    // logic..
}

BatchAcknowledgingMessageListener<String, String> {data, acknowledgment ->
    // logic..
}

BatchConsumerAwareMessageListener<String, String> {data, consumer ->
    // logic..
}

BatchAcknowledgingConsumerAwareMessageListener<String, String> {data, acknowledgment, consumer ->
    // logic..
}

 

2-2. MessageListenerContainers 종류

KafkaMessageListenerContainer, ConcurrentMessageListenerContainer로 두가지로 나뉩니다. 

  • KafkaMessageListenerContainer: Single Thread 처리
  • ConcurrentMessageListenerContainer: Multiple Thread 처리

 

KafkaMessageListenerContainer

모든 Topic, Partition에서 받는 메시지를 하나의 쓰레드로 처리합니다. 

프로듀서에서 카프카 클라이언트에 메세지를 저장하고 컨슈머의 단일 쓰레드가 읽는 플로우

 

단일 쓰레드로 처리하다보니 속도가 느리다.

 

KafkaMessageLlistenerContainer 생성자

// constructor
public KafkaMessageListenerContainer(
            ConsumerFactory<? super K, ? super V> consumerFactory,
            ContainerProperties containerProperties
)

 

간단하게 KafkaMessageListenerContainer를 설정하는 예시입니다.

@Bean
fun kafkaMessageListenerContainer(): KafkaMessageListenerContainer<String, String> {
    // properties
    val props = ContainerProperties("save_member")
    // Kafka에서 메시지를 읽어오는 Listener
    props.messageListener = MessageListener<String, String>() {
        log.info("Consumer Recode. Value=${it.value()} Offset=${it.offset()}")
    }
    props.commitLogLevel = LogIfLevelEnabled.Level.DEBUG
    props.setGroupId("test11")

    // consumeFactory
    val cf = DefaultKafkaConsumerFactory<String, String>(getConfig())
    
    return KafkaMessageListenerContainer(cf, props)
}

private fun getConfig(): Map<String, Any> =
    mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
    )

 

ConcurrentMessageListenerContainer

모든 Topic, Partition에서 받는 메시지를 병렬 처리합니다.

프로듀서에서 카프카 클라이언트에 메세지를 저장하고 컨슈머의 멀티 쓰레드가 읽는 플로우

 

concurrency 설정 중 container.setConcurrency(N) 를 사용하여 N개 KafkaMessageListenerContainer 인스턴스를 생성할 수 있습니다. 

setConcurrency(3)으로 설정한 경우

 

ConcurrentMessageListenerContainer 생성자

// constructor
public ConcurrentMessageListenerContainer(
            ConsumerFactory<? super K, ? super V> consumerFactory,
            ContainerProperties containerProperties
)

 

간단하게  ConcurrentKafkaListenerContainerFactory를 설정하는 예시입니다.

@Bean
fun saveMemberKafkaListener(): ConcurrentKafkaListenerContainerFactory<String, String> =
    kafkaListenerContainerFactory()

private fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

    factory.setConcurrency(3) // Consumer Process Thread Count
    factory.consumerFactory = getConfig()
    factory.containerProperties.pollTimeout = 500
    factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL

    return factory
}

private fun getConfig(): ConsumerFactory<String, String> {
    val config = mutableMapOf<String, Any>()

    config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVER
    config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"  // 마지막 읽은 부분부터 Read
    config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
    config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java

    return DefaultKafkaConsumerFactory(config)
}

 

3. Committing Offset

kafka의 설정 중 enable.auto.commit은  구성에 따라서 오프셋을 자동으로 커밋 여부를 결정합니다.

false라면 컨테이너는 몇가지 AckMode 설정은 지원합니다.

 

consumer poll() 메서드는 하나 이상의 ConsumerRecords를 반환하고 MessageListener는 각 레코드를 호출합니다. 그리고 다음 컨테이너가 각 AckMode에 대해 수행합니다. 아래는 AckMode의 종류입니다.

AckMode Description
RECORD 레코드 처리 후 수신기가 반환되면 오프셋 커밋
BATCH ( default ) poll() 에서 반환된 모든 레코드가 처리되면 오프셋 커밋
TIME 마지막 커밋 이후의 ackTime이 초과된 경우 poll()에서 반환된 모든 레코드가 처리되면 오프셋 커밋
COUNT 마지막 커밋 이후 ackCount 레코드가 수신된 경우 poll()에서 반환된 모든 레코드가 처리되면 오프셋 커밋
COUNT_TIME TIME, COUNT와 유사하지만 두 조건 중 하나가 true면 커밋
MANUAL 메시지 리스터는 Acknowlegment에서 acknowledge()를 호출할 책임이 있음. 그리고 후에는 BATCH와 동일
MANUAL_IMMEDIATE Acknowledgment.acknowledge() 호출되면 즉시 오프셋 커밋

 

알아야할 사항

  • transactions를 사용할 때 오프셋이 트랜잭션으로 전송되고 Listener 타입에 따라 RECODE OR BATCH로 작동한다. 
  • MANUALMANUAL_IMMEDIATE 일 경우 반드시 AcknowledgingMessageListener 혹은 BatchAcknowledgingMessageListener를 사용해야합니다.
  • listener container는 오프셋을 자동으로 커밋하는 메커니즘이 있기 때문에 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG를 false로 하는 것을 권장합니다.
  • Version 2.3 이후 부터는 consumer factory 혹은 컨테이너의 consumer property에 오버라이드하여 재설정하지 않는 한 false로 설정되어있습니다.
  • acknowledgment를 사용하면 오프셋이 커밋되는 시기를 제어할 수 있다.

 

4. nack

2.3 Version 이후부터는 acknowledgment interface는 두가지 추가적인 메서드가 있습니다. 

  • nack(long sleep): RECODE Listener와 함께 사용
  • nack(int index, long sleep): Batch Listener와 함께 사용
    • nack 메서드에 따른 다른 listener 호출 시 IllegalStateException 발생
    • 부분 배치를 커밋하길 원한다면 nack를 사용하여 트랜잭션을 사용할 때 AckMode를 Manual로 설정합니다. nack()을 호출하면 성공적으로 처리된 레코드의 오프셋이 트랜잭션으로 전송됩니다.
nack() 메서드는 listener를 invoke하는 consumer 스레드에서만 호출할 수 있습니다.

 

 

반응형