Study/message-queue

[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer 사용)

에디개발자 2021. 8. 24. 06:00
반응형

나를 닮았다고 한다...

들어가기전에

카프카를 사용하면서 Consumer 설정하는 하던 중 ConcurrentMessageListenerContainer를 선택해서 사용하였으나 Concurrency의 설정 기준 잡기가 모호하여 관련정보를 스터디하며 정리한 글입니다. 

모든 소스는 Github에 올려두었습니다.

 

목차

  1. 토픽이 1개인 경우
    1. 파티션1, Concurrency1, Call3
    2. 파티션2, Concurrency2, Call3
    3. 파티션3, Concurrency6, Call6
    4. 결론
  2. 토픽이 3개인 경우

Concurrency 무조건 많다고 좋은게 아니다.

 

잘못된 생각

컨슈머에서 Concurrency가 무조건 많으면 많은 Message를 Concurrency 만큼 Listen할 수 있겠구나!

 

컨슈머에서 Concurrency, 즉 Thread가 할당되는 조건은 파티션 단위였습니다. 실제 테스트를 통해 컨슈머가 어떻게 처리하는 지 확인해보겠습니다. 동시성을 확인하기 위해 처리 로직에 Sleep을 길게 주었습니다.

@Component
class MemberConsumer: AcknowledgingMessageListener<String, String> {

    private val log = LogManager.getLogger()

    @KafkaListener(topics = ["save_member"], groupId = "test", containerFactory = "saveMemberKafkaListener")
    override fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
        try {
            log.info("Save Consumer Message:: [ $data ] value:: [ ${data.value()} ]")
            Thread.sleep(50000)  // sleep을 길게~~
            // ack 처리
            acknowledgment?.acknowledge()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

}

 

1. 토픽이 1개인 경우

1-1. 파티션 1, Concurrency 1, Call 3

Thread 하나가 1개의 Message를 처리

2개의 Message는 대기중

 

1-2. 파티션 1, Concurrency 2, Call 3

Thread 하나가 1개의 Message를 처리

2개의 Message는 대기중

위와 동일한 결과

여기서 생각할 수 있는 건 "하나의 Thread는 하나의 파티션을 전담하여 처리하고 있다" 입니다. 

좀 더 명확하게 하기 위해 테스트를 진행해보겠습니다.

 

1-3. 파티션 3, Concurrency 6, Call 6

Thread 3개가 3개의 Message를 처리

3개의 Message는 대기중

이 결과에선 offset이 2개가 0이 나오고 나머지 하나가 3이나왔습니다. 즉 2개는 새로 추가한 파티션이고 나머지 하나는 이전에 생성되어있던 파티션입니다. 

이 결과로 명확하게 하나의 Thread는 하나의 파티션을 전담하여 처리하는 것을 확인할 수 있습니다.

 

1-4. 결론

Concurrency 개수의 기준은 파티션의 개수와 맞추는 것이 가장 효율적입니다. 

Thread의 개수가 Partition보다 많으면 논다~~

2. 토픽이 3개인 경우

앞에서 테스트한 결과 파티션의 개수에 맞게 Concurrency를 설정해야하는 것을 알았습니다. 그렇다면 토픽이 3개고 파티션이 5개인 경우에는 Concurrency를 15개 주면 가장 이상적... 이지 않습니다. 

 

결과는 아래와 같이 5개만 활성화 상태이고 나머지 10개는 비활성화 상태로 나타납니다.

5개만 활설화되고 10개는 비활성화 상태

왜?

이유는 기본 Kafka PartitionAssignor가 RangeAssignor 방식이기 때문입니다.

 

이런 케이스에서는 파티션을 모든 컨슈머에 배포하는 RoundRobinAssignor 방식을 사용하는 것이 좋습니다. 

변경 방법은 partition.assignment.strategy 혹은 ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 설정값을 RoundRobinAssignor 방식으로 변경합니다.

 

수정

partition.assignment.strategy 혹은 ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 수정합니다.

 

partition.assignment.strategy

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

 

ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 

private fun getConfig(): ConsumerFactory<String, String> {
        val config = mutableMapOf<String, Any>()

        config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVER
        config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"  // 마지막 읽은 부분부터 Read
        config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
        config[ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG] = listOf(RoundRobinAssignor::class.java)  // 적용
        config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java

        return DefaultKafkaConsumerFactory(config)
    }
반응형