Spring-Kafka Lifecycle

2021. 9. 4. 07:00·Study/message-queue
반응형

나를 닮았다고 한다...

이번 글에서는 Spring-Kafka의 Lifecycle에 대해서 작성해보겠습니다.

목차

  1. Lifecycle
  2. Lifecycle Management
  3. 주의사항

1. Lifecycle

@KafkaListener는 Application Context 안에 Bean이 아닙니다. @KafkaListener는 KafkaListenerEndpointRegistry에서 Bean으로 등록이 됩니다. 등록된 Bean은 framework에 의해 자동으로 선언되고 Container의 Lifecycle을 관리합니다. 

KafkaListenerEndpointRegistry.java 메서드 중빨간 블럭처리 되어있는 코드에서 container를 등록합니다.

등록된 Bean은 설정값 autoStartup이 true인 것을 자동으로 시작합니다. 

Listener Container들은 SmartLifeCycle을 Implement하고 있습니다. 그리고 autoStartup 기본 설정값은 true입니다. 

@KafkaListener(id = "listener1", topics = ["insert_member"])
fun listener1(data: String) {
    println("listener1 data:: $data")
}

Annotation KafkaListener를 선언한 메서드는 Listener Container로 등록되어 사용됩니다.

 

 

2. Lifecycle Management

우리는 KafkaListenerEndpointRegistry를 이용해서 LifeCycle을 프로그래밍 방식으로 관리할 수 있습니다. 먼저 KafkaListener Annotation의 속성값중 autoStartup을 false로 설정합니다. 

@KafkaListener(id = "listener1", topics = ["insert_member"], groupId = "group1", autoStartup = "false")
fun listener1(data: String) {
    println("listener1 data:: $data")
}

Producer를 통해 데이터를 write하여도 listener1 메서드는 데이터를 읽어오지 않습니다. 

 

KafkaListenerEndpointRegistry를 이용하여 listener1 메서드를 작동하도록 설정합니다. 

@RestController
@RequestMapping("/api/member")
class MemberController(
    val registry: KafkaListenerEndpointRegistry
) {

    @PatchMapping("/start")
    fun consumerStart() {
        val listener1 = registry.getListenerContainer("listener1")
        listener1?.start()
    }

    @PatchMapping("/stop")
    fun consumerStop() {
        val listener1 = registry.getListenerContainer("listener1")
        listener1?.stop()
    }
}

 

kafkaListenerEndpointRegistry에서 제어하고자 하는 Bean의 ID를 파라미터로 넘겨 ListenerContainer를 받습니다. 그리고 ListenerContainer를 start(), stop() 메서드로 제어할 수 있습니다. 

 

3. 주의 사항

intellij에서 registry 빈을 찾을 수 없다고 나오지만 무시하셔도 됩니다. 정상 작동합니다.

kafkaListenerEndpointRegistry를 가져와 사용하려니 위와 같은 에러가 발생하였습니다. 그래서 아래와 같이 KafkaListenerEndpointRegistry를 생성하여 Bean에 등록했습니다. 하지만 새로운 객체를 생성하여 Bean에 등록하면 @KafkaListener를 선언한 Bean의 Lifecycle을 제어할 수 없습니다. 

// 정상작동하지 않음
@Bean
fun registry(): KafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

 

위 Warning은 무시하고 kafkaListenerEndpointRegistry를 사용하고자 하는 클래스에서 의존성을 주입받아 사용하시면 됩니다.

StackOverFlow 참조

반응형

'Study > message-queue' 카테고리의 다른 글

Spring-Kafka Consumer Offset 관리  (2) 2021.10.14
Spring-Kafka Consumer Validation  (0) 2021.10.12
[kafka] @KafkaListener를 이용한 Consumer 구현  (0) 2021.08.26
[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer 사용)  (2) 2021.08.24
[kafka] Message Listeners를 이용한 Consumer 구현  (0) 2021.08.23
'Study/message-queue' 카테고리의 다른 글
  • Spring-Kafka Consumer Offset 관리
  • Spring-Kafka Consumer Validation
  • [kafka] @KafkaListener를 이용한 Consumer 구현
  • [kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer 사용)
에디개발자
에디개발자
------ 한발자국씩 성장하자 ------ Github: https://github.com/yongtaelim LinkedIn: https://www.linkedin.com/in/%EC%9A%A9%ED%83%9C-%EC%9E%84-622b69218/
    250x250
  • 에디개발자
    에디블로그
    에디개발자
    • 분류 전체보기 (201) N
      • Develop (51)
        • spring-data (28)
        • spring-batch (7)
        • devops (5)
        • java (5)
        • kotlin (3)
        • database (2)
      • MindControl (12)
      • TroubleShooting (16)
      • Study (76)
        • kotlin (16)
        • java (15)
        • spring (6)
        • test (4)
        • message-queue (10)
        • object (22)
      • Develop Tool (1)
      • Daily Develop (7)
      • Book (5)
      • AI (22) N
        • Claude (10) N
        • ChatGPT (5) N
        • Cursor (2)
        • Gemini (3) N
        • 트랜드 (2)
      • 개발 트렌드 (8) N
        • 데일리 픽 (8) N
  • 인기 글

  • 태그

    에디
    Ai
    개발자 도구
    ai 정보
    claude
    queryDSL
    객체지향
    JPA
    프로그래밍
    java
    코틀린
    자바
    AI 활용
    클로드
    JPQL
    스터디
    anthropic
    백기선
    kotlin
    엘레강트
  • 최근 글

  • hELLO· Designed By정상우.v4.10.6
에디개발자
Spring-Kafka Lifecycle
상단으로

티스토리툴바