Study/message-queue

Spring-Kafka Lifecycle

에디개발자 2021. 9. 4. 07:00
반응형

나를 닮았다고 한다...

이번 글에서는 Spring-Kafka의 Lifecycle에 대해서 작성해보겠습니다.

목차

  1. Lifecycle
  2. Lifecycle Management
  3. 주의사항

1. Lifecycle

@KafkaListener는 Application Context 안에 Bean이 아닙니다. @KafkaListenerKafkaListenerEndpointRegistry에서 Bean으로 등록이 됩니다. 등록된 Bean은 framework에 의해 자동으로 선언되고 Container의 Lifecycle을 관리합니다. 

KafkaListenerEndpointRegistry.java 메서드 중빨간 블럭처리 되어있는 코드에서 container를 등록합니다.

등록된 Bean은 설정값 autoStartup이 true인 것을 자동으로 시작합니다. 

Listener Container들은 SmartLifeCycleImplement하고 있습니다. 그리고 autoStartup 기본 설정값은 true입니다. 

@KafkaListener(id = "listener1", topics = ["insert_member"])
fun listener1(data: String) {
    println("listener1 data:: $data")
}

Annotation KafkaListener를 선언한 메서드는 Listener Container로 등록되어 사용됩니다.

 

 

2. Lifecycle Management

우리는 KafkaListenerEndpointRegistry를 이용해서 LifeCycle을 프로그래밍 방식으로 관리할 수 있습니다. 먼저 KafkaListener Annotation의 속성값중 autoStartup을 false로 설정합니다. 

@KafkaListener(id = "listener1", topics = ["insert_member"], groupId = "group1", autoStartup = "false")
fun listener1(data: String) {
    println("listener1 data:: $data")
}

Producer를 통해 데이터를 write하여도 listener1 메서드는 데이터를 읽어오지 않습니다. 

 

KafkaListenerEndpointRegistry를 이용하여 listener1 메서드를 작동하도록 설정합니다. 

@RestController
@RequestMapping("/api/member")
class MemberController(
    val registry: KafkaListenerEndpointRegistry
) {

    @PatchMapping("/start")
    fun consumerStart() {
        val listener1 = registry.getListenerContainer("listener1")
        listener1?.start()
    }

    @PatchMapping("/stop")
    fun consumerStop() {
        val listener1 = registry.getListenerContainer("listener1")
        listener1?.stop()
    }
}

 

kafkaListenerEndpointRegistry에서 제어하고자 하는 Bean의 ID를 파라미터로 넘겨 ListenerContainer를 받습니다. 그리고 ListenerContainer를 start(), stop() 메서드로 제어할 수 있습니다. 

 

3. 주의 사항

intellij에서 registry 빈을 찾을 수 없다고 나오지만 무시하셔도 됩니다. 정상 작동합니다.

kafkaListenerEndpointRegistry를 가져와 사용하려니 위와 같은 에러가 발생하였습니다. 그래서 아래와 같이 KafkaListenerEndpointRegistry를 생성하여 Bean에 등록했습니다. 하지만 새로운 객체를 생성하여 Bean에 등록하면 @KafkaListener를 선언한 Bean의 Lifecycle을 제어할 수 없습니다. 

// 정상작동하지 않음
@Bean
fun registry(): KafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

 

위 Warning은 무시하고 kafkaListenerEndpointRegistry를 사용하고자 하는 클래스에서 의존성을 주입받아 사용하시면 됩니다.

StackOverFlow 참조

반응형