반응형

spring-kafka 6

Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo )

들어가기 전에 Consumer를 통해 카프카 클러스터에서 데이터를 Read할 때 모두 정상처리된다면 좋겠지만 그렇치 못한 경우도 있습니다. 그럴 경우 Spring-Kafka에서 재처리를 편리하게 할 수 있도록 Reply(@SendTo), Retry 기능을 제공합니다. 이번 글에서는 재처리 하는 방법에대해서 정리해보겠습니다. 목차 재처리를 해야하는 이유 ReplyTemplate 구성 @SendTo를 사용한 ReplyTemplate 구현 Reply 처리 플로우 1. 재처리를 해야하는 이유 Consumer를 이용해 카프카 클러스터에서 Read 한 후 Repository에 저장할 수도 있고 메일 발송을 할 수도 있고 다양하게 사용할 수 있습니다. 하지만 Repository 서버가 다운된다거나 메일 발송 중 메일..

Study/message-queue 2021.10.15

Spring-Kafka Consumer Offset 관리

들어가기 전에 Consumer를 AutoCommit을 false로 설정하고 사용하고 있습니다. Consumer에서 로직을 완료하고 partition에 commit을 처리하도록 작성하였습니다. 그런데 로직을 처리하던 중 Exception이 발생했고 partition에 commit을 하지 못했습니다. 그렇다면 이 Consumer는 무한 루프에 빠져 데이터를 계속 읽고 Exception을 계속 발생시킬까요? 정답은 NO! 입니다. Consumer 내부에서 Partition Offset과는 별개로 Consumer만을 위한 Offset을 관리하고 있기 때문입니다. Consumer Polling 방식 Consumer는 KafkaMessageListenerContainer에서 새로운 Thread를 생성하여 while..

Study/message-queue 2021.10.14

Spring-Kafka Consumer Validation

모든 코드는 Github에 올려두었습니다. 들어가기 전에 Consumer는 Kafka에 Topic, Partition의 Record를 읽어오는 역할을 합니다. 하지만 Kafka에 잘못된 데이터가 들어오는 경우 Consumer에서는 데이터를 Validation 처리하여 올바른 데이터만 처리해야합니다. 이번 글에서는 Spring-Kafka를 사용하는 Consumer에서 어떻게 Validation 처리를 하는지 작성해보겠습니다. 목차 Validation 방식 Validation 구성 Consumer Validation 구현 Validation 테스트 @KafkaListener ErrorHandler 결론 Validation 방식 Spring-Kafka는 Version 2.2 부터 @KafkaListener의 ..

Study/message-queue 2021.10.12

Spring-Kafka Lifecycle

이번 글에서는 Spring-Kafka의 Lifecycle에 대해서 작성해보겠습니다. 목차 Lifecycle Lifecycle Management 주의사항 1. Lifecycle @KafkaListener는 Application Context 안에 Bean이 아닙니다. @KafkaListener는 KafkaListenerEndpointRegistry에서 Bean으로 등록이 됩니다. 등록된 Bean은 framework에 의해 자동으로 선언되고 Container의 Lifecycle을 관리합니다. 등록된 Bean은 설정값 autoStartup이 true인 것을 자동으로 시작합니다. Listener Container들은 SmartLifeCycle을 Implement하고 있습니다. 그리고 autoStartup 기본..

Study/message-queue 2021.09.04

[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer 사용)

들어가기전에 카프카를 사용하면서 Consumer 설정하는 하던 중 ConcurrentMessageListenerContainer를 선택해서 사용하였으나 Concurrency의 설정 기준 잡기가 모호하여 관련정보를 스터디하며 정리한 글입니다. 모든 소스는 Github에 올려두었습니다. 목차 토픽이 1개인 경우 파티션1, Concurrency1, Call3 파티션2, Concurrency2, Call3 파티션3, Concurrency6, Call6 결론 토픽이 3개인 경우 Concurrency 무조건 많다고 좋은게 아니다. 잘못된 생각 컨슈머에서 Concurrency가 무조건 많으면 많은 Message를 Concurrency 만큼 Listen할 수 있겠구나! 컨슈머에서 Concurrency, 즉 Thread..

Study/message-queue 2021.08.24

[kafka] KafkaTemplate을 이용한 Producer 구현

Kotlin으로 Spring-Kafka 의존성을 주입받아 Producer를 구현하는 과정에 대해 정리한 글입니다. 모든 소스는 Github에 올려두었습니다. 목차 KafkaTemplate 구성 KafkaTemplate 사용하여 Producer 구현 KafkaTemplate 원초적으로 Producer를 통해 Kafka에 Message를 Send하려면 KafkaProducer 인스턴스를 사용하여 send() 메서드를 호출해야합니다. KafkaTemplate은 KafkaProducer 감싸고 있는 인스턴스라고 생각하시면 이해하시기 편합니다. KafkaTemplate.send() 는 내부에서 결국 KafkaProducer 인스턴스의 send()를 호출하고 있습니다. 1. KafkaTemplate 구성 Kafka..

Study/message-queue 2021.08.20
728x90
반응형