Spring-Kafka Consumer Offset 관리

2021. 10. 14. 06:00·아카이브/message-queue
반응형

나를 닮았다고 한다...

들어가기 전에

Consumer를 AutoCommit을 false로 설정하고 사용하고 있습니다. Consumer에서 로직을 완료하고 partition에 commit을 처리하도록 작성하였습니다. 그런데 로직을 처리하던 중 Exception이 발생했고 partition에 commit을 하지 못했습니다. 그렇다면 이 Consumer는 무한 루프에 빠져 데이터를 계속 읽고 Exception을 계속 발생시킬까요?

 

정답은 NO! 입니다. Consumer 내부에서 Partition Offset과는 별개로 Consumer만을 위한 Offset을 관리하고 있기 때문입니다.


Consumer Polling 방식

  1. Consumer는 KafkaMessageListenerContainer에서 새로운 Thread를 생성하여 while문을 이용해 무한 루프를 돌립니다. ( Consumer LifeCycle을 참조 )
  2. KafkaMessageLisenterContainer에서 Consumer 객체의 poll() 메서드를 호출합니다.
  3. Consumer는 Interface이고 구현체로는 KafkaConsumer를 Default로 사용하고 있습니다.
  4. KafkaConsumer에서 Fetcher 객체의 fetchedRecords() 메서드를 호출하고 있습니다.
  5. Fetcher 객체 내부에서 CompletedFetch 객체의 fetchedRecords() 메서드를 호출하여 Record를 조회합니다.

 

Consumer 내부 Offset 관리

Fetcher 클래스 내부의 CompletedFetch 클래스가 존재합니다. 이 CompletedFetch 클래스의 nextFetchOffset 변수를 통해 Offset을 관리하고 있습니다. 

private class CompletedFetch {
    private final TopicPartition partition;
    private final Iterator<? extends RecordBatch> batches;
    private final Set<Long> abortedProducerIds;
    private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions;
    private final FetchResponse.PartitionData<Records> partitionData;
    private final FetchResponseMetricAggregator metricAggregator;
    private final short responseVersion;

    private int recordsRead;
    private int bytesRead;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    private long nextFetchOffset;   // offset 관리
    private Optional<Integer> lastEpoch;
    private boolean isConsumed = false;
    private Exception cachedRecordException = null;
    private boolean corruptLastRecord = false;
    private boolean initialized = false;
    
    // .... 
}

Consumer는 먼저 Partition의 offset을 조회한 후 nextFetchOffset과 비교하여 같다면 데이터를 읽어들입니다.

 

결론

이러한 방법 덕분에 Consumer에서는 데이터를 조회한 후 Exception이 발생했더라도 같은 데이터를 다시 읽어오지 않습니다. 당연하지만 Exception 발생으로 Partition의 Offset Commit 명령을 날리지 않았기 때문에 Consumer를 재기동하면 처리하지 못한 데이터를 모두 읽어들입니다.

반응형

'아카이브 > message-queue' 카테고리의 다른 글

Kafka를 이용한 실패처리 삽질 기록  (0) 2022.06.09
Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo )  (0) 2021.10.15
Spring-Kafka Consumer Validation  (0) 2021.10.12
Spring-Kafka Lifecycle  (1) 2021.09.04
[kafka] @KafkaListener를 이용한 Consumer 구현  (0) 2021.08.26
'아카이브/message-queue' 카테고리의 다른 글
  • Kafka를 이용한 실패처리 삽질 기록
  • Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo )
  • Spring-Kafka Consumer Validation
  • Spring-Kafka Lifecycle
에디개발자
에디개발자
------ 한발자국씩 성장하자 ------ Github: https://github.com/yongtaelim LinkedIn: https://www.linkedin.com/in/%EC%9A%A9%ED%83%9C-%EC%9E%84-622b69218/
    250x250
  • 에디개발자
    에디블로그
    에디개발자
    • 분류 전체보기 (307) N
      • AI (71) N
        • Claude (29)
        • ChatGPT (12)
        • Cursor (13)
        • Gemini (11) N
        • 트랜드 (5)
        • 오늘의 AI 업데이트 (1)
      • 개발 트렌드 (47) N
        • 데일리 픽 (33) N
        • 툴 리뷰 (2) N
      • 개발자 도구 (13) N
        • 생산성 툴 (4)
        • 노트 & 지식관리 (2)
        • 협업 & 이슈 (0)
        • 터미널 & 환경 (1)
        • API & DB (4)
        • 에디터 & IDE (2) N
      • 실험실 (2)
        • AI 자동화 (0)
        • 자동매매 (1)
        • 블로그 자동화 (1)
      • 아카이브 (168)
        • MindControl (12)
        • TroubleShooting (16)
        • kotlin ( 아카이브 ) (16)
        • java ( 아카이브 ) (15)
        • spring (6)
        • test (4)
        • message-queue (10)
        • object (22)
        • spring-data (28)
        • spring-batch (7)
        • devops (5)
        • java (5)
        • kotlin (3)
        • database (2)
        • Study (3)
        • Develop (1)
        • Develop Tool (1)
        • Daily Develop (7)
        • Book (5)
  • 인기 글

  • 태그

    Ai
    cursor
    AI 업데이트
    프로그래밍
    claude
    AI 활용
    코틀린
    ai 정보
    OpenAI
    개발자 도구
    JPA
    anthropic
    클로드
    queryDSL
    ai 코딩
    스터디
    개발자
    에디
    Gemini
    ChatGPT
  • 최근 글

  • hELLO· Designed By정상우.v4.10.6
에디개발자
Spring-Kafka Consumer Offset 관리
상단으로

티스토리툴바