[Kafka] @KafkaListener 컨슈머 설정 정리: concurrency, ackMode, containerFactory, 역직렬화
![[Kafka] @KafkaListener 컨슈머 설정 정리: concurrency, ackMode, containerFactory, 역직렬화](https://blog.kakaocdn.net/dna/bQ9e8O/dJMcaar2Jte/AAAAAAAAAAAAAAAAAAAAAB_SH_IPQY-Iig5RQlNEg4V0jaaVa2XEitZDER4GY1t5/img.png?credential=yqXZFxpELC7KVnFOS48ylbz2pIh7yKj8&expires=1782831599&allow_ip=&allow_referer=&signature=fhOM1K8FYYH8D0DIsi6Rn56260w%3D)

[Kafka] @KafkaListener 컨슈머 설정 정리: concurrency, ackMode, containerFactory, 역직렬화
스프링에서 카프카 컨슈머는 @KafkaListener 한 줄이면 떠요. 그래서 처음엔 쉽게 느껴지는데, 막상 운영에 올리면 "동시성을 올렸는데 처리량이 안 늘어요", "컨슈머가 자꾸 멈췄다 다시 시작해요", "메시지가 중복 처리돼요" 같은 문제를 만나요. 대부분은 컨슈머가 내부에서 어떻게 도는지를 모른 채 기본값으로 쓰다 생기는 일이에요.
@KafkaListener가 실제로 하는 일에서 출발해, 파라미터 바인딩과 역직렬화, 동시성(concurrency), containerFactory 커스터마이징, ackMode, 꼭 알아야 할 컨슈머 설정값 순서로 따라가 볼게요. 끝에는 자주 만나는 문제 세 가지의 원인과 진단법도 붙였어요.
01. @KafkaListener가 실제로 하는 일
먼저 가장 작은 컨슈머예요. 토픽과 컨슈머 그룹만 주면 바로 폴링이 시작돼요.
애너테이션 없이 컨테이너를 직접 구성하는 기초 구현이 궁금하면 2021년 기록 Message Listeners를 이용한 Consumer 구현과 @KafkaListener를 이용한 Consumer 구현도 있어요.
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(String message) {
log.info("received: {}", message);
}
이 한 줄 뒤에서 스프링은 꽤 많은 일을 해요.

@KafkaListener를 발견하면 KafkaListenerContainerFactory가 메시지 리스너 컨테이너를 하나 만들어요.- 그 컨테이너는 내부에서 카프카
Consumer객체를 만들고, 별도 스레드에서poll()루프를 돌려요. poll()로 받은 레코드를 하나씩(또는 배치로) 우리 메서드에 넘겨 호출해요.- 처리가 끝나면 ackMode 정책에 따라 오프셋을 커밋해요.
핵심은 우리가 직접 while(true) { consumer.poll() } 루프를 짜지 않는다는 거예요. 그 폴링 루프, 스레드 관리, 커밋, 리밸런싱 대응을 전부 컨테이너가 대신해줘요. 대신 우리는 그 컨테이너의 동작을 설정으로 제어해요. 그래서 "설정을 안다 = 컨슈머를 안다"가 돼요.
02. 리스너 메서드 파라미터, 무엇까지 받을 수 있나
리스너 메서드는 메시지 값(payload)만 받는 게 아니에요. 필요한 만큼 파라미터로 선언하면 스프링이 알아서 바인딩해줘요.
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(
@Payload Order order, // 역직렬화된 값
@Header(KafkaHeaders.RECEIVED_KEY) String key, // 메시지 키
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) { // 수동 커밋용
// ...
ack.acknowledge();
}
값과 메타데이터를 통째로 보고 싶으면 ConsumerRecord를 그대로 받을 수도 있어요.
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(ConsumerRecord<String, Order> record) {
log.info("key={} partition={} offset={} value={}",
record.key(), record.partition(), record.offset(), record.value());
}
실무에서는 키와 파티션, 오프셋을 로그에 남겨두면 나중에 문제 추적이 훨씬 쉬워요. 어떤 파티션의 몇 번 오프셋에서 터졌는지가 바로 보이거든요.
03. 역직렬화, 가장 많이 터지는 지점
카프카는 메시지를 바이트로 주고받아요. 그래서 컨슈머는 그 바이트를 객체로 바꾸는 Deserializer가 필요해요. key와 value 각각 지정해요.
# application.yml
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.order"
JSON을 객체로 받을 때는 JsonDeserializer를 써요. 이때 spring.json.trusted.packages를 지정하지 않으면 역직렬화 대상 패키지를 신뢰하지 않아 예외가 나요. 보안 때문에 기본이 막혀 있는 거예요.
역직렬화 실패는 평범하게 안 잡혀요

여기가 함정이에요. 역직렬화는 메시지가 우리 리스너 메서드에 도달하기 전에 일어나요. 그래서 깨진 메시지가 오면 메서드 안에서 try-catch를 아무리 잘 해도 안 잡혀요. 게다가 기본 상태에서는 그 깨진 메시지에서 컨슈머가 계속 같은 예외를 내며 멈춰버려요.
그래서 value-deserializer를 ErrorHandlingDeserializer로 감싸는 게 사실상 기본 설정이에요. 감싸두면 역직렬화 실패를 정상 흐름으로 잡아서 건너뛰거나 DLQ로 보낼 수 있어요.
spring:
kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
04. concurrency, 동시성의 상한은 파티션 수예요
concurrency는 이 리스너가 띄울 컨슈머 스레드 개수예요. 각 스레드가 독립된 Consumer를 갖고, 토픽의 파티션이 그 스레드들에게 나눠 할당돼요.
concurrency와 파티션 수의 관계를 직접 실험한 2021년 기록이 Concurrency 설정 기준 글에 있어요. 결론은 그때나 지금이나 같아요.
@KafkaListener(topics = "orders", groupId = "order-service", concurrency = "3")
public void consume(Order order) { ... }
여기서 가장 흔한 오해가 있어요. concurrency를 올린다고 무한정 빨라지지 않아요. 동시에 돌 수 있는 컨슈머 수의 상한은 결국 파티션 수예요.

파티션이 3개인데 concurrency = 5로 주면, 파티션을 못 받은 2개 스레드는 그냥 놀아요(idle). 한 파티션은 한 번에 한 컨슈머만 소비할 수 있거든요. 반대로 파티션이 6개인데 concurrency = 2면, 스레드 하나가 파티션 3개씩 맡아요.
그래서 처리량을 늘리는 순서는 이래요. 먼저 토픽 파티션 수를 충분히 두고, 그 다음 컨슈머 인스턴스와 concurrency를 파티션 수에 맞춰 올려요. 파티션 수가 전체 병렬도의 천장이라는 걸 기억하면 돼요.
한 가지 더, 한 파티션 안에서는 메시지 순서가 보장돼요. 그래서 순서가 중요한 데이터는 같은 키로 같은 파티션에 들어가게 설계하고, 그 파티션을 한 스레드가 순서대로 처리하게 둬요. 키 설계는 파티셔닝과 메시지 키 편에서 자세히 다뤄요.
05. containerFactory, 세부 설정을 잡는 곳
애너테이션 속성만으로는 역직렬화, 배치 처리, 에러 핸들러 같은 걸 다 못 정해요. 이런 건 ConcurrentKafkaListenerContainerFactory 빈에서 설정해요.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order>
kafkaListenerContainerFactory(ConsumerFactory<String, Order> cf) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Order>();
factory.setConsumerFactory(cf);
factory.setConcurrency(3); // 동시성
factory.getContainerProperties().setAckMode(AckMode.MANUAL); // 커밋 모드
factory.setCommonErrorHandler(new DefaultErrorHandler()); // 에러 핸들러
return factory;
}
빈 이름이 kafkaListenerContainerFactory면 @KafkaListener가 기본으로 이 팩토리를 써요. 토픽마다 다른 설정이 필요하면 팩토리를 여러 개 만들어서 @KafkaListener(containerFactory = "otherFactory")로 골라 쓰면 돼요.
배치 리스너
한 건씩 말고 poll()로 받은 묶음을 한꺼번에 처리하고 싶으면 배치 리스너를 켜요. DB에 벌크로 적재하는 경우처럼 묶어서 처리하는 게 유리할 때 써요.
factory.setBatchListener(true);
@KafkaListener(topics = "orders", groupId = "order-service")
public void consume(List<Order> orders) { // 묶음으로 받음
orderRepository.saveAll(orders);
}
06. ackMode, 커밋 시점을 고르는 일
컨슈머가 "여기까지 처리했다"고 카프카에 알리는 게 오프셋 커밋이에요. 스프링 카프카는 enable.auto.commit=false를 기본으로 두고, ackMode로 커밋 시점을 관리해요. 종류가 여러 개예요.
RECORD— 레코드 하나 처리할 때마다 커밋BATCH(기본) — 한 번의poll()로 받은 묶음을 다 처리한 뒤 커밋TIME— 지정한 시간 간격마다 커밋COUNT— 지정한 레코드 수마다 커밋COUNT_TIME— 시간 또는 개수 중 먼저 도달하는 쪽MANUAL—Acknowledgment.acknowledge()호출 시 (다음 poll에 커밋)MANUAL_IMMEDIATE—acknowledge()호출 즉시 커밋
기본은 BATCH예요. 한 번 poll로 받은 메시지를 다 처리한 뒤 한꺼번에 커밋해요. 그래서 중간에 한 건이 실패하고 컨슈머가 죽으면, 이미 처리한 앞쪽 메시지까지 다시 받게 돼요. 이게 중복 처리의 출발점이에요.
커밋을 직접 제어하고 싶으면 MANUAL로 두고 리스너에서 직접 ack 해요. 처리를 끝낸 뒤 ack 하면 "최소 한 번(at-least-once)" 보장이 돼요. 수동 커밋의 중복 문제와 멱등성 처리는 분량이 커서 수동 커밋과 멱등성 글에서 따로 다뤘어요.
07. 꼭 알아야 할 컨슈머 설정값
컨슈머를 운영에 올리기 전에 적어도 이 값들의 의미는 알아야 해요. 기본값으로 두면 운영 중에 예상 못 한 동작을 만나요.
group.id— 컨슈머 그룹 이름. 같은 그룹끼리 파티션을 나눠 갖고, 그룹이 다르면 같은 메시지를 각자 받아요.auto.offset.reset— 커밋된 오프셋이 없을 때(새 그룹 등) 어디서부터 읽을지.latest(기본)는 새 메시지부터,earliest는 맨 처음부터예요.max.poll.records— 한 번poll()로 가져올 최대 레코드 수(기본 500). 처리 시간이 길면 줄여요.max.poll.interval.ms— 다음poll()까지 허용되는 최대 간격(기본 5분). 이 안에 못 돌아오면 컨슈머가 죽은 걸로 간주돼 그룹에서 추방되고 리밸런싱이 일어나요.session.timeout.ms/heartbeat.interval.ms— heartbeat로 살아있음을 알리는 주기와, 그게 끊겼다고 판단하는 한도예요. heartbeat는 session timeout의 1/3 이하로 둬요.
두 가지 함정을 짚어둘게요. 첫째, 새 컨슈머 그룹을auto.offset.reset=latest(기본값)로 띄우면 그 전에 쌓여 있던 메시지를 통째로 건너뛰어요. "배포했는데 옛 메시지가 처리가 안 됐어요"의 단골 원인이에요. 둘째,max.poll.records와max.poll.interval.ms는 한 쌍이에요. 한 번에 많이 가져오는데 건당 처리가 느리면, 그 묶음을 처리하는 동안 다음 poll로 못 돌아와 추방돼요. 처리가 느린 컨슈머는 records를 줄이는 게 첫 번째 처방이에요.
08. 자주 만나는 문제 세 가지
동시성을 올렸는데 처리량이 그대로예요
대부분 파티션 수가 천장이에요. concurrency가 파티션 수보다 크면 초과 스레드는 놀아요. 파티션 수부터 확인하고, 모자라면 토픽 파티션을 늘려요.
컨슈머가 자꾸 멈췄다 다시 시작해요
리밸런싱이 반복되는 거예요. 처리가 느려 max.poll.interval.ms를 넘겨 추방되거나, 배포 때마다 인스턴스가 들고 나면서 생겨요. max.poll.records를 줄이거나 처리 로직을 가볍게 하고, 리밸런싱 자체를 줄이는 방법은 리밸런싱과 파티션 할당 전략 글에서 다뤘어요.
메시지가 중복 처리돼요
at-least-once에서는 구조적으로 생겨요. 처리 후 커밋 사이에 죽거나, 추방돼 재할당되면 같은 메시지를 다시 받아요. 그래서 같은 메시지를 두 번 처리해도 결과가 같게 만드는 멱등성이 필요해요. 이건 수동 커밋과 멱등성 글에서 자세히 풀었어요.
정리
@KafkaListener 컨슈머는 결국 세 축으로 좁혀져요. 병렬도의 상한을 정하는 concurrency(상한은 파티션 수), 역직렬화·배치·에러 핸들러를 잡는 containerFactory, 커밋 시점을 고르는 ackMode. 여기에 역직렬화를 ErrorHandlingDeserializer로 감싸고 max.poll 계열을 처리 속도에 맞추면, 운영에서 만나는 문제 대부분이 풀려요.
여기가 카프카 컨슈머 이야기의 출발점이에요. 받은 메시지를 안전하게 끊어 커밋하는 수동 커밋과 멱등성, 컨슈머가 들고 날 때의 리밸런싱과 파티션 할당, 실패 메시지를 빼내는 에러 핸들링과 DLQ로 이어집니다.
출처: Spring for Apache Kafka — Message Listener Containers · @KafkaListener Annotation · Serialization & ErrorHandlingDeserializer · Kafka — Consumer Configs
'백엔드' 카테고리의 다른 글
📚 같이 보면 좋은
"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 일정액의 수수료를 제공받습니다."