Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다.
모든 소스는 Github에 올려두었습니다.
목차
- Consumer
- Message Listeners 이용한 구현
- Listener 종류
- MessageListenerContainers 종류
- Committing Offset
- nack
1. Consumer
컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 Message Listeners 를 이용하여 구현한 내용을 정리해보겠습니다.
2. Message Listeners 이용한 구현
Message Listener Container를 사용할 때 Receive Data를 listener에서 제공해야만 합니다.
2-1. Listener 종류
Listener | When | ConsumerRecord 처리 대상 | 비고 |
MessageListener | - 자동 커밋 - Container-Managed 중 하나 |
개별 | |
AcknowledgingMessageListener | - 수동 커밋 중 하나 | 개별 | |
ConsumerAwareMessageListener | - 자동 커밋 - Container-Managed 중 하나 |
개별 | Consumer 엑세스 가능 |
AcknowledgingConsumerAwareMessageListener | - 수동 커밋 중 하나 | 개별 | Consumer 엑세스 가능 |
BatchMessageListener | - 자동 커밋 - Container-Managed 중 하나 |
ALL | AckMode.RECODE 지원 X |
BatchAcknowledgingMessageListener | - 수동 커밋 중 하나 | ALL | |
BatchConsumerAwareMessageListener | - 자동 커밋 - Container-Managed 중 하나 |
ALL | Consumer 엑세스 가능 AckMode.RECODE 지원 X |
BatchAcknowledgingConsumerAwareMessageListener | - 수동 커밋 중 하나 | ALL | Consumer 엑세스 가능 |
Consumer는 Thread-Safe 하지 않습니다. Listener를 호출하는 쓰레드에서만 호출해야합니다.
MessageListener
Partition에 Record를 1개씩 읽어와 처리합니다.
MessageListener 및 MessageListener를 상속받는 Listener 클래스의 종류입니다.
MessageListener<String, String>() { data ->
// logic..
}
AcknowledgingMessageListener<String, String> { data, acknowledgment ->
// logic..
}
ConsumerAwareMessageListener<String, String> { data, consumer ->
// logic..
}
AcknowledgingConsumerAwareMessageListener<String, String> {data, acknowledgment, consumer ->
// logic..
}
BatchMessageListener
Partition에 모든 데이터를 읽어와 처리합니다.
BatchMessageListener 및 BatchMessgeListener를 상속받는 Listener 클래스 종류
BatchMessageListener<String, String> {data ->
// logic..
}
BatchAcknowledgingMessageListener<String, String> {data, acknowledgment ->
// logic..
}
BatchConsumerAwareMessageListener<String, String> {data, consumer ->
// logic..
}
BatchAcknowledgingConsumerAwareMessageListener<String, String> {data, acknowledgment, consumer ->
// logic..
}
2-2. MessageListenerContainers 종류
KafkaMessageListenerContainer, ConcurrentMessageListenerContainer로 두가지로 나뉩니다.
- KafkaMessageListenerContainer: Single Thread 처리
- ConcurrentMessageListenerContainer: Multiple Thread 처리
KafkaMessageListenerContainer
모든 Topic, Partition에서 받는 메시지를 하나의 쓰레드로 처리합니다.
KafkaMessageLlistenerContainer 생성자
// constructor
public KafkaMessageListenerContainer(
ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties
)
간단하게 KafkaMessageListenerContainer를 설정하는 예시입니다.
@Bean
fun kafkaMessageListenerContainer(): KafkaMessageListenerContainer<String, String> {
// properties
val props = ContainerProperties("save_member")
// Kafka에서 메시지를 읽어오는 Listener
props.messageListener = MessageListener<String, String>() {
log.info("Consumer Recode. Value=${it.value()} Offset=${it.offset()}")
}
props.commitLogLevel = LogIfLevelEnabled.Level.DEBUG
props.setGroupId("test11")
// consumeFactory
val cf = DefaultKafkaConsumerFactory<String, String>(getConfig())
return KafkaMessageListenerContainer(cf, props)
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
)
ConcurrentMessageListenerContainer
모든 Topic, Partition에서 받는 메시지를 병렬 처리합니다.
concurrency 설정 중 container.setConcurrency(N) 를 사용하여 N개 KafkaMessageListenerContainer 인스턴스를 생성할 수 있습니다.
ConcurrentMessageListenerContainer 생성자
// constructor
public ConcurrentMessageListenerContainer(
ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties
)
간단하게 ConcurrentKafkaListenerContainerFactory를 설정하는 예시입니다.
@Bean
fun saveMemberKafkaListener(): ConcurrentKafkaListenerContainerFactory<String, String> =
kafkaListenerContainerFactory()
private fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.setConcurrency(3) // Consumer Process Thread Count
factory.consumerFactory = getConfig()
factory.containerProperties.pollTimeout = 500
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}
private fun getConfig(): ConsumerFactory<String, String> {
val config = mutableMapOf<String, Any>()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVER
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" // 마지막 읽은 부분부터 Read
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(config)
}
3. Committing Offset
kafka의 설정 중 enable.auto.commit은 구성에 따라서 오프셋을 자동으로 커밋 여부를 결정합니다.
false라면 컨테이너는 몇가지 AckMode 설정은 지원합니다.
consumer poll() 메서드는 하나 이상의 ConsumerRecords를 반환하고 MessageListener는 각 레코드를 호출합니다. 그리고 다음 컨테이너가 각 AckMode에 대해 수행합니다. 아래는 AckMode의 종류입니다.
AckMode | Description |
RECORD | 레코드 처리 후 수신기가 반환되면 오프셋 커밋 |
BATCH ( default ) | poll() 에서 반환된 모든 레코드가 처리되면 오프셋 커밋 |
TIME | 마지막 커밋 이후의 ackTime이 초과된 경우 poll()에서 반환된 모든 레코드가 처리되면 오프셋 커밋 |
COUNT | 마지막 커밋 이후 ackCount 레코드가 수신된 경우 poll()에서 반환된 모든 레코드가 처리되면 오프셋 커밋 |
COUNT_TIME | TIME, COUNT와 유사하지만 두 조건 중 하나가 true면 커밋 |
MANUAL | 메시지 리스터는 Acknowlegment에서 acknowledge()를 호출할 책임이 있음. 그리고 후에는 BATCH와 동일 |
MANUAL_IMMEDIATE | Acknowledgment.acknowledge() 호출되면 즉시 오프셋 커밋 |
알아야할 사항
- transactions를 사용할 때 오프셋이 트랜잭션으로 전송되고 Listener 타입에 따라 RECODE OR BATCH로 작동한다.
- MANUAL, MANUAL_IMMEDIATE 일 경우 반드시 AcknowledgingMessageListener 혹은 BatchAcknowledgingMessageListener를 사용해야합니다.
- listener container는 오프셋을 자동으로 커밋하는 메커니즘이 있기 때문에 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG를 false로 하는 것을 권장합니다.
- Version 2.3 이후 부터는 consumer factory 혹은 컨테이너의 consumer property에 오버라이드하여 재설정하지 않는 한 false로 설정되어있습니다.
- acknowledgment를 사용하면 오프셋이 커밋되는 시기를 제어할 수 있다.
4. nack
2.3 Version 이후부터는 acknowledgment interface는 두가지 추가적인 메서드가 있습니다.
- nack(long sleep): RECODE Listener와 함께 사용
- nack(int index, long sleep): Batch Listener와 함께 사용
- nack 메서드에 따른 다른 listener 호출 시 IllegalStateException 발생
- 부분 배치를 커밋하길 원한다면 nack를 사용하여 트랜잭션을 사용할 때 AckMode를 Manual로 설정합니다. nack()을 호출하면 성공적으로 처리된 레코드의 오프셋이 트랜잭션으로 전송됩니다.
nack() 메서드는 listener를 invoke하는 consumer 스레드에서만 호출할 수 있습니다.
'Study > message-queue' 카테고리의 다른 글
Spring-Kafka Lifecycle (1) | 2021.09.04 |
---|---|
[kafka] @KafkaListener를 이용한 Consumer 구현 (0) | 2021.08.26 |
[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer 사용) (2) | 2021.08.24 |
[kafka] KafkaTemplate을 이용한 Producer 구현 (1) | 2021.08.20 |
그림으로 이해하는 카프카 (2) | 2021.08.05 |