Study/message-queue

Spring-Kafka Consumer Validation

에디개발자 2021. 10. 12. 22:52
반응형

나를 닮았다고 한다...

모든 코드는 Github에 올려두었습니다. 

 

들어가기 전에

Consumer는 Kafka에 Topic, Partition의 Record를 읽어오는 역할을 합니다. 하지만 Kafka에 잘못된 데이터가 들어오는 경우 Consumer에서는 데이터를 Validation 처리하여 올바른 데이터만 처리해야합니다. 

이번 글에서는 Spring-Kafka를 사용하는 Consumer에서 어떻게 Validation 처리를 하는지 작성해보겠습니다.

 

목차

  1. Validation 방식
  2. Validation 구성
  3. Consumer Validation 구현
  4. Validation 테스트
  5. @KafkaListener ErrorHandler
  6. 결론

Validation 방식

Spring-Kafka는 Version 2.2 부터 @KafkaListener의 @Payload 인자값을 쉽게 Validate하는 Validator 기능이 추가되었습니다. 

@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {

    override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
        registrar.setValidator(ConsumerValidator())  // 원하는 Validator를 설정할 수 있다.
    }
    
}

위와 같이 원하는 Validator를 생성하여 등록할 수 있지만 이 글에서는 Spring-Boot에서 지원하는 validation 기능을 사용해보겠습니다.

 

Validation 구성

먼저 build.gradle.kts 파일에 spring-boot-starter-validation 의존성을 주입합니다.

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-validation")
}

 

다음으로 LocalValidatorFactoryBean을 validator로 설정합니다.

@Configuration
@EnableKafka
class KafkaValidatorConfig(
    /** spring validation */
    private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
    
    // validation
    override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
        registrar.setValidator(this.validator)  // validator 설정
    }
}
LocalValidatorFactoryBean은 spring-boot-starter-validation 의존성 주입하면 자동으로 Bean에 등록됩니다.

 

다음으로 Consumer에서 JsonDeserializer방식을 이용하여 Member 객체 형태로 Read합니다. 

@Configuration
@EnableKafka
class KafkaValidatorConfig(
    /** spring validation */
    private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
    
    @Bean
    fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
        return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
            it.consumerFactory =
                DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
            it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        }
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        )
}

 

Member 객체는 아래를 참조해주세요.

data class Member(

   @field:NotNull
   val name: String? = null,

   @field:Min(1)
   @field:Max(20)
   val age: Int? = null
)

 


아래는 Validation을 사용하기 위한 Config 전체 코드입니다.

@Configuration
@EnableKafka
class KafkaValidatorConfig(
    /** spring validation */
    private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String
    
    // validation
    override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
        registrar.setValidator(this.validator)  // validator 설정
    }
    
    // factory
    @Bean
    fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
        return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
            it.consumerFactory =
                DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
            it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        }
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        )
}

 

Consumer Validation 구현

Consumer에서 Read하는 Member 객체 (@Payload) 에 @Valid를 선언하여 사용합니다.

Controller에서 @Valid 사용하는 방식과 동일하게 작성해주시면 됩니다.
@Component
class MemberValidationTestConsumer {

    @KafkaListener(
        id = "member_validate",
        topics = ["insert_member"],
        containerFactory = "memberFactory",
        groupId = "m_group"
    )
    fun memberListener(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
        println("memberListener data:: $member")
        println("memberListener offset:${meta.offset()} partition:${meta.partition()}")

        acknowledgment.acknowledge()
    }
    
}

 

Validation 테스트

Member 객체에 age 필드를 Max값을 초과하는 값으로 세팅 후 Kafka에 저장시켜 Consumer가 Read하도록 해보겠습니다.

{
    "name": "yong",
    "age": 100
}

 

결과는 아래와 같은 에러가 발생합니다.

2021-10-12 18:36:07.613 ERROR 10093 --- [ber_retry-0-C-1] o.s.k.l.SeekToCurrentErrorHandler        : Backoff none exhausted for insert_member-0@327

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment)]
Bean [me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer@1458548]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [100]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=100), headers={kafka_offset=327, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3df5cb5e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634031367458, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 327, CreateTime = 1634031367458, serialized key size = 4, serialized value size = 25, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=100)), kafka_groupId=m_group}]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [100]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=100), headers={kafka_offset=327, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3df5cb5e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634031367458, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 327, CreateTime = 1634031367458, serialized key size = 4, serialized value size = 25, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=100)), kafka_groupId=m_group}]
// Exception Trace Log

에러를 요약하자면 age에서 Max값을 초과했다는 에러입니다. 

 

여기에 추가로 발생한 Error를 핸들링하고 싶다면 ErrorHandler을 사용할 수 있습니다.

@KafkaListener ErrorHandler

@KafkaListener(
    id = "member_retry",
    topics = ["insert_member"],
    containerFactory = "memberFactory",
    groupId = "m_group",
    errorHandler = "validationErrorHandler"  // errorHandler 추가
)
fun memberListener2(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
    println("memberListener data:: $member")
    println("memberListener offset:${meta.offset()} partition:${meta.partition()}")
    
    acknowledgment.acknowledge()
}

@Bean
fun validationErrorHandler(): KafkaListenerErrorHandler {
    return KafkaListenerErrorHandler { message, exception ->
        // error 처리
        message 
    }
}

ErrorHandler를 Bean에 등록 후 @KafkaListener에서 등록하여 사용할 수 있습니다. 

 

결론

Spring-Kafka는 이렇게 간단한 방법으로 Validation 처리를 할 수 있습니다. 이와 같은 처리방식으로 인해 발생한 Exception 종류에 따라 Retry 정책을 분기처리하여 관리할 수 있을 것 입니다. 

다음 글에서는 Consumer에서 실패 시 Spring-Kafka에서 지원하는 Reply, Retry 기능에 대해서 작성해보겠습니다.

반응형