Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo )
들어가기 전에
Consumer를 통해 카프카 클러스터에서 데이터를 Read할 때 모두 정상처리된다면 좋겠지만 그렇치 못한 경우도 있습니다. 그럴 경우 Spring-Kafka에서 재처리를 편리하게 할 수 있도록 Reply(@SendTo), Retry 기능을 제공합니다. 이번 글에서는 재처리 하는 방법에대해서 정리해보겠습니다.
목차
- 재처리를 해야하는 이유
- ReplyTemplate 구성
- @SendTo를 사용한 ReplyTemplate 구현
- Reply 처리 플로우
1. 재처리를 해야하는 이유
Consumer를 이용해 카프카 클러스터에서 Read 한 후 Repository에 저장할 수도 있고 메일 발송을 할 수도 있고 다양하게 사용할 수 있습니다. 하지만 Repository 서버가 다운된다거나 메일 발송 중 메일 서버가 다운될 수 있습니다. 이럴 경우 Consumer는 맡은 역할을 수행하지 못하고 Record를 읽기만 합니다.
Record를 읽은 Consumer는 자체적으로 Offset을 관리하여 Kafka 클러스터에게 Offset Commit 명령을 날리지 않아도 다시 읽어오지 않습니다. ( Consumer를 재기동하면 다시 읽어옵니다. Spring-Kafka Consumer Offset 관리 )
결론적으로 실패한 케이스에 대해서는 재처리를 진행해야하며 재처리 몇회 재시도, 실패 시 어떻게 대응 등 정책을 구상해야합니다.
이번 글에서 다룰 재처리 방식은 ReplyTemplate, RetryTemplate에 대해서 정리해보겠습니다.
2. ReplyTemplate 구성
ReplyTemplate은 @KafkaListener에서 Exception이 발생했을 경우 재시도를 1회 처리할 수 있는 기능입니다. 사용방법으로는 @SendTo와 ReplyTemplate 구성을 하여 사용할 수 있습니다. 코드로 살펴보겠습니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {
/**
* reply kafka template
* @return KafkaTemplate<String, Member>
*/
fun replyTemplate(): KafkaTemplate<String, Member> {
return KafkaTemplate(factory())
}
/**
* reply kafka template factory
* @return DefaultKafkaProducerFactory<String, Member>
*/
fun factory(): DefaultKafkaProducerFactory<String, Member> {
return DefaultKafkaProducerFactory<String, Member>(memberProducerConfig())
}
/**
* reply kafka configuration
* @return Map<String, Serializable>
*/
private fun memberProducerConfig(): Map<String, Any> =
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
)
}
ReplyTemplate은 KafkaTemplate을 사용하여 Reply를 구현할 수 있습니다. 기존의 KafkaTemplate을 생성하고 구성하는 방식과 동일하게 작성합니다.
다음으로 생성한 ReplyTemplate을 KafkaListenerContainerFactory에 설정합니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVER: String
@Bean
fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
it.consumerFactory =
DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
// set reply
it.setReplyTemplate(replyTemplate())
}
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // 마지막 읽은 부분부터 Read
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
)
}
KafkaTemplate, ReplyTemplate을 모두 적용한 코드는 아래와 같습니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVER: String
@Bean
fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
it.consumerFactory =
DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
// set reply
it.setReplyTemplate(replyTemplate())
}
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // 마지막 읽은 부분부터 Read
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
)
/**
* reply kafka template
* @return KafkaTemplate<String, Member>
*/
fun replyTemplate(): KafkaTemplate<String, Member> {
return KafkaTemplate(factory())
}
/**
* reply kafka template factory
* @return DefaultKafkaProducerFactory<String, Member>
*/
fun factory(): DefaultKafkaProducerFactory<String, Member> {
return DefaultKafkaProducerFactory<String, Member>(memberProducerConfig())
}
/**
* reply kafka configuration
* @return Map<String, Serializable>
*/
private fun memberProducerConfig(): Map<String, Any> =
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
)
}
3. @SendTo를 사용한 ReplyTemplate 구현
위에서도 언급한 내용으로 간단하게 @SendTo 를 사용하여 구현할 수 있습니다. @SendTo 의 인자값으로 Reply할 Topic 명을 작성하여 사용합니다. 코드로 살펴보겠습니다.
@Component
class MemberValidationTestConsumer {
@KafkaListener(
id = "member_listener",
topics = ["insert_member"],
containerFactory = "memberFactory",
groupId = "m_group",
errorHandler = "validationErrorHandler"
)
@SendTo(value = ["reply_member"]) // 토픽명을 작성한 SendTo 사용
fun memberListener(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
println("memberListener data:: $member")
println("memberListener offset:${meta.offset()} partition:${meta.partition()}")
acknowledgment.acknowledge()
}
@Bean
fun validationErrorHandler(): KafkaListenerErrorHandler {
return KafkaListenerErrorHandler { message, exception ->
println("validation error handler message=${message.payload} exception groupId=${exception.groupId} exception cause=${exception.rootCause}")
message // 리턴되는 값으로 토픽에 저장됨
}
}
}
위 코드에서 주의하여 살펴볼 곳은 validationErrorHandler입니다. KafkaListenerErrorHandler에서 반환되는 message를 @SendTo에서 설정한 토픽으로 Write합니다. 이 점을 주의해주시기 바랍니다!!
다음으로 SendTo에서 설정한 토픽을 Listen하는 코드를 작성해보겠습니다.
@Component
class MemberValidationTestConsumer {
@KafkaListener(
id = "reply_member_listener",
topics = ["reply_member"],
containerFactory = "replyMemberFactory",
groupId = "m_group",
errorHandler = "replyErrorHandler"
)
fun replyMemberListener(member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
println("replyMemberListener data:: $member")
println("replyMemberListener offset:${meta.offset()} partition:${meta.partition()}")
saveFailData(member) // save repository
acknowledgment.acknowledge()
}
@Bean
fun replyErrorHandler(): KafkaListenerErrorHandler {
return KafkaListenerErrorHandler { message, exception ->
println("validation error handler message=${message.payload} exception groupId=${exception.groupId} exception cause=${exception.rootCause}")
message
}
}
}
reply 처리된 데이터를 Listen하고 실패한 케이스에 대해 저장을 처리하는 로직입니다. 이 로적도 마찬가지로 SendTo를 이용해서 재시도를 할 수 있고 동일하게 ErrorHandler를 등록할 수 도 있습니다.
4. Reply 처리 플로우
실제 Exception을 발생시켜보면 아래와 같은 로그로 어떻게 작동하는 지 확인할 수 있습니다.
validation error handler message=Member(name=yong, age=1000) exception groupId=null exception cause=org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener1(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [1000]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=1000), headers={kafka_offset=374, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@764e6c18, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634113553803, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 374, CreateTime = 1634113553803, serialized key size = 4, serialized value size = 26, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=1000)), kafka_groupId=m_group}]
replyMemberListener data:: Member(name=yong, age=1000)
replyMemberListener offset:1 partition:0
Exception이 발생했을 경우 아래와 같은 흐름으로 동작합니다.
- Consumer( member_listener )에서 Read
- Consumer에서 처리하던 중 Exception 발생!!
- Consumer에서 등록한 ErrorHandler( validationErrorHandler ) 호출 후 message 전달
- SendTo에 작성한 토픽으로 message Write
- Consumer( reply_member_listener )에서 Read
Consumer 로직 처리 후 repository에 실패한 message 저장