모든 코드는 Github에 올려두었습니다.
들어가기 전에
Consumer는 Kafka에 Topic, Partition의 Record를 읽어오는 역할을 합니다. 하지만 Kafka에 잘못된 데이터가 들어오는 경우 Consumer에서는 데이터를 Validation 처리하여 올바른 데이터만 처리해야합니다.
이번 글에서는 Spring-Kafka를 사용하는 Consumer에서 어떻게 Validation 처리를 하는지 작성해보겠습니다.
목차
- Validation 방식
- Validation 구성
- Consumer Validation 구현
- Validation 테스트
- @KafkaListener ErrorHandler
- 결론
Validation 방식
Spring-Kafka는 Version 2.2 부터 @KafkaListener의 @Payload 인자값을 쉽게 Validate하는 Validator 기능이 추가되었습니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {
override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
registrar.setValidator(ConsumerValidator()) // 원하는 Validator를 설정할 수 있다.
}
}
위와 같이 원하는 Validator를 생성하여 등록할 수 있지만 이 글에서는 Spring-Boot에서 지원하는 validation 기능을 사용해보겠습니다.
Validation 구성
먼저 build.gradle.kts 파일에 spring-boot-starter-validation 의존성을 주입합니다.
dependencies {
implementation("org.springframework.boot:spring-boot-starter-validation")
}
다음으로 LocalValidatorFactoryBean을 validator로 설정합니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig(
/** spring validation */
private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
// validation
override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
registrar.setValidator(this.validator) // validator 설정
}
}
LocalValidatorFactoryBean은 spring-boot-starter-validation 의존성 주입하면 자동으로 Bean에 등록됩니다.
다음으로 Consumer에서 JsonDeserializer방식을 이용하여 Member 객체 형태로 Read합니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig(
/** spring validation */
private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
@Bean
fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
it.consumerFactory =
DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
}
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // 마지막 읽은 부분부터 Read
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
)
}
Member 객체는 아래를 참조해주세요.
data class Member(
@field:NotNull
val name: String? = null,
@field:Min(1)
@field:Max(20)
val age: Int? = null
)
아래는 Validation을 사용하기 위한 Config 전체 코드입니다.
@Configuration
@EnableKafka
class KafkaValidatorConfig(
/** spring validation */
private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVER: String
// validation
override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
registrar.setValidator(this.validator) // validator 설정
}
// factory
@Bean
fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
it.consumerFactory =
DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
}
}
private fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // 마지막 읽은 부분부터 Read
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
)
}
Consumer Validation 구현
Consumer에서 Read하는 Member 객체 (@Payload) 에 @Valid를 선언하여 사용합니다.
Controller에서 @Valid 사용하는 방식과 동일하게 작성해주시면 됩니다.
@Component
class MemberValidationTestConsumer {
@KafkaListener(
id = "member_validate",
topics = ["insert_member"],
containerFactory = "memberFactory",
groupId = "m_group"
)
fun memberListener(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
println("memberListener data:: $member")
println("memberListener offset:${meta.offset()} partition:${meta.partition()}")
acknowledgment.acknowledge()
}
}
Validation 테스트
Member 객체에 age 필드를 Max값을 초과하는 값으로 세팅 후 Kafka에 저장시켜 Consumer가 Read하도록 해보겠습니다.
{
"name": "yong",
"age": 100
}
결과는 아래와 같은 에러가 발생합니다.
2021-10-12 18:36:07.613 ERROR 10093 --- [ber_retry-0-C-1] o.s.k.l.SeekToCurrentErrorHandler : Backoff none exhausted for insert_member-0@327
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment)]
Bean [me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer@1458548]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [100]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=100), headers={kafka_offset=327, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3df5cb5e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634031367458, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 327, CreateTime = 1634031367458, serialized key size = 4, serialized value size = 25, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=100)), kafka_groupId=m_group}]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [100]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=100), headers={kafka_offset=327, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3df5cb5e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634031367458, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 327, CreateTime = 1634031367458, serialized key size = 4, serialized value size = 25, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=100)), kafka_groupId=m_group}]
// Exception Trace Log
에러를 요약하자면 age에서 Max값을 초과했다는 에러입니다.
여기에 추가로 발생한 Error를 핸들링하고 싶다면 ErrorHandler을 사용할 수 있습니다.
@KafkaListener ErrorHandler
@KafkaListener(
id = "member_retry",
topics = ["insert_member"],
containerFactory = "memberFactory",
groupId = "m_group",
errorHandler = "validationErrorHandler" // errorHandler 추가
)
fun memberListener2(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
println("memberListener data:: $member")
println("memberListener offset:${meta.offset()} partition:${meta.partition()}")
acknowledgment.acknowledge()
}
@Bean
fun validationErrorHandler(): KafkaListenerErrorHandler {
return KafkaListenerErrorHandler { message, exception ->
// error 처리
message
}
}
ErrorHandler를 Bean에 등록 후 @KafkaListener에서 등록하여 사용할 수 있습니다.
결론
Spring-Kafka는 이렇게 간단한 방법으로 Validation 처리를 할 수 있습니다. 이와 같은 처리방식으로 인해 발생한 Exception 종류에 따라 Retry 정책을 분기처리하여 관리할 수 있을 것 입니다.
다음 글에서는 Consumer에서 실패 시 Spring-Kafka에서 지원하는 Reply, Retry 기능에 대해서 작성해보겠습니다.
'Study > message-queue' 카테고리의 다른 글
Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo ) (0) | 2021.10.15 |
---|---|
Spring-Kafka Consumer Offset 관리 (2) | 2021.10.14 |
Spring-Kafka Lifecycle (1) | 2021.09.04 |
[kafka] @KafkaListener를 이용한 Consumer 구현 (0) | 2021.08.26 |
[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer 사용) (2) | 2021.08.24 |