Study/message-queue

Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo )

에디개발자 2021. 10. 15. 06:00
반응형

나를 닮았다고 한다...

들어가기 전에

Consumer를 통해 카프카 클러스터에서 데이터를 Read할 때 모두 정상처리된다면 좋겠지만 그렇치 못한 경우도 있습니다. 그럴 경우 Spring-Kafka에서 재처리를 편리하게 할 수 있도록 Reply(@SendTo), Retry 기능을 제공합니다. 이번 글에서는 재처리 하는 방법에대해서 정리해보겠습니다.

 

목차

  1. 재처리를 해야하는 이유
  2. ReplyTemplate 구성
  3. @SendTo를 사용한 ReplyTemplate 구현
  4. Reply 처리 플로우

1. 재처리를 해야하는 이유

Consumer를 이용해 카프카 클러스터에서 Read 한 후 Repository에 저장할 수도 있고 메일 발송을 할 수도 있고 다양하게 사용할 수 있습니다. 하지만 Repository 서버가 다운된다거나 메일 발송 중 메일 서버가 다운될 수 있습니다. 이럴 경우 Consumer는 맡은 역할을 수행하지 못하고 Record를 읽기만 합니다. 

 

Record를 읽은 Consumer는 자체적으로 Offset을 관리하여 Kafka 클러스터에게 Offset Commit 명령을 날리지 않아도 다시 읽어오지 않습니다. ( Consumer를 재기동하면 다시 읽어옵니다. Spring-Kafka Consumer Offset 관리

 

결론적으로 실패한 케이스에 대해서는 재처리를 진행해야하며 재처리 몇회 재시도, 실패 시 어떻게 대응 등 정책을 구상해야합니다.

이번 글에서 다룰 재처리 방식은 ReplyTemplate, RetryTemplate에 대해서 정리해보겠습니다.

 

2. ReplyTemplate 구성

ReplyTemplate은 @KafkaListener에서 Exception이 발생했을 경우 재시도를 1회 처리할 수 있는 기능입니다. 사용방법으로는 @SendToReplyTemplate 구성을 하여 사용할 수 있습니다. 코드로 살펴보겠습니다.

 

@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {

    /**
     * reply kafka template
     * @return KafkaTemplate<String, Member>
     */
    fun replyTemplate(): KafkaTemplate<String, Member> {
        return KafkaTemplate(factory())
    }

    /**
     * reply kafka template factory
     * @return DefaultKafkaProducerFactory<String, Member>
     */
    fun factory(): DefaultKafkaProducerFactory<String, Member> {
        return DefaultKafkaProducerFactory<String, Member>(memberProducerConfig())
    }

    /**
     * reply kafka configuration
     * @return Map<String, Serializable>
     */
    private fun memberProducerConfig(): Map<String, Any> =
        mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
        )

}

ReplyTemplate은 KafkaTemplate을 사용하여 Reply를 구현할 수 있습니다. 기존의 KafkaTemplate을 생성하고 구성하는 방식과 동일하게 작성합니다. 

 

다음으로 생성한 ReplyTemplate을 KafkaListenerContainerFactory에 설정합니다.

@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
        return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
            it.consumerFactory =
                DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
            it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL

            // set reply
            it.setReplyTemplate(replyTemplate())
        }
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        )
}

 

KafkaTemplate, ReplyTemplate을 모두 적용한 코드는 아래와 같습니다.

@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
        return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
            it.consumerFactory =
                DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
            it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL

            // set reply
            it.setReplyTemplate(replyTemplate())
        }
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        )

    /**
     * reply kafka template
     * @return KafkaTemplate<String, Member>
     */
    fun replyTemplate(): KafkaTemplate<String, Member> {
        return KafkaTemplate(factory())
    }

    /**
     * reply kafka template factory
     * @return DefaultKafkaProducerFactory<String, Member>
     */
    fun factory(): DefaultKafkaProducerFactory<String, Member> {
        return DefaultKafkaProducerFactory<String, Member>(memberProducerConfig())
    }

    /**
     * reply kafka configuration
     * @return Map<String, Serializable>
     */
    private fun memberProducerConfig(): Map<String, Any> =
        mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
        )

}

 

3. @SendTo를 사용한 ReplyTemplate 구현

위에서도 언급한 내용으로 간단하게 @SendTo 를 사용하여 구현할 수 있습니다. @SendTo 의 인자값으로 Reply할 Topic 명을 작성하여 사용합니다. 코드로 살펴보겠습니다.

@Component
class MemberValidationTestConsumer {

    @KafkaListener(
        id = "member_listener",
        topics = ["insert_member"],
        containerFactory = "memberFactory",
        groupId = "m_group",
        errorHandler = "validationErrorHandler"
    )
    @SendTo(value = ["reply_member"])    // 토픽명을 작성한 SendTo 사용
    fun memberListener(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
        println("memberListener data:: $member")
        println("memberListener offset:${meta.offset()} partition:${meta.partition()}")

        acknowledgment.acknowledge()
    }

    @Bean
    fun validationErrorHandler(): KafkaListenerErrorHandler {
        return KafkaListenerErrorHandler { message, exception ->
            println("validation error handler message=${message.payload} exception groupId=${exception.groupId} exception cause=${exception.rootCause}")
            message    // 리턴되는 값으로 토픽에 저장됨
        }
    }
}

 

위 코드에서 주의하여 살펴볼 곳은 validationErrorHandler입니다. KafkaListenerErrorHandler에서 반환되는 message를 @SendTo에서 설정한 토픽으로 Write합니다. 이 점을 주의해주시기 바랍니다!!

 

다음으로 SendTo에서 설정한 토픽을 Listen하는 코드를 작성해보겠습니다.

@Component
class MemberValidationTestConsumer {

    @KafkaListener(
        id = "reply_member_listener",
        topics = ["reply_member"],
        containerFactory = "replyMemberFactory",
        groupId = "m_group",
        errorHandler = "replyErrorHandler"
    )
    fun replyMemberListener(member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
        println("replyMemberListener data:: $member")
        println("replyMemberListener offset:${meta.offset()} partition:${meta.partition()}")
        
        saveFailData(member)  // save repository
        acknowledgment.acknowledge()
    }

    @Bean
    fun replyErrorHandler(): KafkaListenerErrorHandler {
        return KafkaListenerErrorHandler { message, exception ->
            println("validation error handler message=${message.payload} exception groupId=${exception.groupId} exception cause=${exception.rootCause}")
            message
        }
    }
}

reply 처리된 데이터를 Listen하고 실패한 케이스에 대해 저장을 처리하는 로직입니다. 이 로적도 마찬가지로 SendTo를 이용해서 재시도를 할 수 있고 동일하게 ErrorHandler를 등록할 수 도 있습니다. 

 

4. Reply 처리 플로우

실제 Exception을 발생시켜보면 아래와 같은 로그로 어떻게 작동하는 지 확인할 수 있습니다.

validation error handler message=Member(name=yong, age=1000) exception groupId=null exception cause=org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener1(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [1000]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=1000), headers={kafka_offset=374, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@764e6c18, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634113553803, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 374, CreateTime = 1634113553803, serialized key size = 4, serialized value size = 26, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=1000)), kafka_groupId=m_group}]
replyMemberListener data:: Member(name=yong, age=1000)
replyMemberListener offset:1 partition:0

Exception이 발생했을 경우 아래와 같은 흐름으로 동작합니다.

  1. Consumer( member_listener )에서 Read
  2. Consumer에서 처리하던 중 Exception 발생!!
  3. Consumer에서 등록한 ErrorHandler( validationErrorHandler ) 호출 후 message 전달
  4. SendTo에 작성한 토픽으로 message Write
  5. Consumer( reply_member_listener )에서 Read

Consumer 로직 처리 후 repository에 실패한 message 저장

 

 

반응형