Kafka를 이용한 실패처리 삽질 기록
이번 글에서는 Kafka DeadLetterQueue 관련하여 삽질했던 내용을 기록하는 글 입니다.
요구조건
1. 로직을 수행 중 실패할 경우 Repository에 데이터를 적재
2. 특정 주기마다 실패한 데이터를 처리 ( 실패하면 재처리 )
결론
바쁘신 분들을 위해 결론부터 말씀드리자면 Kafka를 이용하여 특정 주기마다 데이터를 읽어와 처리하는 방법은 선택하지 않았습니다.
카프카는 실시간 데이터 처리 플랫폼
이유로는 카프카의 성격과는 맞지 않았습니다. 실시간 처리 플랫폼을 특정 주기마다 읽어온다는 것 자체가 어색하였습니다. 또한 다양한 시도를 통해 적합하지 않다고 생각하였습니다.
다양한 시도
RetryTemplate, ReplyTemplate, DeadLetterPublishingRecover
모두 비슷한 맥락의 시도였습니다. ( ReplyTemplate 사용방법 참조 )
하지만 실패한 경우 일정 시간을 두고 재시도 처리하는 경우 pending 처리되어 polling 모두 pending 되는 문제가 있었습니다.
예시)
- 재시도 주기: 5,000ms
- 100건 실패: 5,000 * 100 만큼 대기
polling 데이터가 많고 pending 시간이 길어진다면 코디네이터에서 제외당하거나 원하는 시간에 처리되지 못하고 pending 될 것입니다.
Polling 주기 설정
ConsumerFactory의 properties 중 idleBetweenPolls 를 사용하면 구현 자체는 가능했었습니다.
@Configuration
class KafkaConfig {
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
return kafkaListenerContainerFactory(
consumerFactory(serviceKafkaServers)
.apply {
updateConfigs(mapOf(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to Integer.MAX_VALUE
))
}
).also { it.containerProperties.idleBetweenPolls = 5 * 60 * 1_000 } // Polling 주기 설정
}
고려해야할 점
다만, 몇 가지 설정을 고려해야했습니다.
ConsumerConfig.MAX_POLL_RECOREDS_CONFIG
1번 Consume할 때 Topic에서 최대 몇 개의 데이터를 가져올 지에 대한 설정
실시간으로 가져오지 않다보니 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 설정을 높게 설정해야했습니다.
예시)
- 실패한 건 수 : 8,000
- MAX_POLL_RECOREDS_CONFIG: 5,000
5분이 지난 시점 5,000개의 데이터만 polling 할 것이며 나머지 3,000개는 다음 5분까지 대기해야하기 때문입니다.
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
해당 설정의 시간동안 poll() 이 호출되지 않으면 리벨런싱이 발생
idleBetweenPolls 설정보다 커야합니다.
예시)
- idleBetweenPolls: 30,000ms
- MAX_POLL_INTERVAL_MS_CONFIG: 25,000ms
25초가 지나면 리벨런싱이 일어나고 해당 Consumer는 polling 할 수 없습니다.
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG
SESSION_TIMEOUT_MS_CONFIG
- heartbeat가 없이 얼만큼 Consumer가 살아있는 것으로 판단하는 기준
HEARTBEAT_INTERVAL_MS_CONFIG
- 해당 시간마다 코디네이터에게 heartbeat 신호를 보냄
일정 주기동안 데이터를 읽지 않고 한 번에 읽어오다보니 SESSION_TIMEOUT_MS_CONFIG 설정을 넉넉하게 잡아야했습니다.
예시)
- SESSION_TIMEOUT_MS_CONFIG: 1000ms
- polling data 개수: 500개
- 1개 당 로직 처리 시간: 3ms
총 처리 시간은 1,500ms 시간이 되고 코디네이터에게 heartbeat 신호를 1,500ms 동안 보내지 못했으므로 해당 컨슈머는 장애로 판단하여 Consume 대상에서 제외됩니다.
그리고 또....
위 설정을 적합한 기준으로 맞춘 후에도 문제는 존재했습니다.
polling data의 개수가 비약적으로 많아진다면 SESSION_TIMEOUT_MS_CONFIG 또한 비약적으로 늘려야할텐데 이럴 경우 consumer에 실제 장애가 발생했을 경우 문제가 될 수 있습니다. 물론 Consumer에서 비동기를 이용하여 병렬로 처리한다면 어느정도 해소될 수 있지만....
과연 카프카를 카프카스럽게 쓰고있는 것인가?
동료 개발자의 리뷰 중 기억에 남는 말이 있었습니다.
포크로 머리를 빗는 것과 같다.
결론
특정 주기를 주어 재처리가 필요한 경우는 Kafka를 이용하는 건 바람직하지 않습니다. ( 주관적인 생각입니다. )
이번 요구사항은 Jenkins ( Scheduler ) + Batch로 풀었습니다.