Study/message-queue

Spring-Kafka Consumer Offset 관리

JessYT 에디개발자 2021. 10. 14. 06:00
반응형

나를 닮았다고 한다...

들어가기 전에

Consumer를 AutoCommit을 false로 설정하고 사용하고 있습니다. Consumer에서 로직을 완료하고 partition에 commit을 처리하도록 작성하였습니다. 그런데 로직을 처리하던 중 Exception이 발생했고 partition에 commit을 하지 못했습니다. 그렇다면 이 Consumer는 무한 루프에 빠져 데이터를 계속 읽고 Exception을 계속 발생시킬까요?

 

정답은 NO! 입니다. Consumer 내부에서 Partition Offset과는 별개로 Consumer만을 위한 Offset을 관리하고 있기 때문입니다.


Consumer Polling 방식

  1. Consumer는 KafkaMessageListenerContainer에서 새로운 Thread를 생성하여 while문을 이용해 무한 루프를 돌립니다. ( Consumer LifeCycle을 참조 )
  2. KafkaMessageLisenterContainer에서 Consumer 객체의 poll() 메서드를 호출합니다.
  3. Consumer는 Interface이고 구현체로는 KafkaConsumer를 Default로 사용하고 있습니다.
  4. KafkaConsumer에서 Fetcher 객체의 fetchedRecords() 메서드를 호출하고 있습니다.
  5. Fetcher 객체 내부에서 CompletedFetch 객체의 fetchedRecords() 메서드를 호출하여 Record를 조회합니다.

 

Consumer 내부 Offset 관리

Fetcher 클래스 내부의 CompletedFetch 클래스가 존재합니다. 이 CompletedFetch 클래스의 nextFetchOffset 변수를 통해 Offset을 관리하고 있습니다. 

private class CompletedFetch {
    private final TopicPartition partition;
    private final Iterator<? extends RecordBatch> batches;
    private final Set<Long> abortedProducerIds;
    private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions;
    private final FetchResponse.PartitionData<Records> partitionData;
    private final FetchResponseMetricAggregator metricAggregator;
    private final short responseVersion;

    private int recordsRead;
    private int bytesRead;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    private long nextFetchOffset;   // offset 관리
    private Optional<Integer> lastEpoch;
    private boolean isConsumed = false;
    private Exception cachedRecordException = null;
    private boolean corruptLastRecord = false;
    private boolean initialized = false;
    
    // .... 
}

Consumer는 먼저 Partition의 offset을 조회한 후 nextFetchOffset과 비교하여 같다면 데이터를 읽어들입니다.

 

결론

이러한 방법 덕분에 Consumer에서는 데이터를 조회한 후 Exception이 발생했더라도 같은 데이터를 다시 읽어오지 않습니다. 당연하지만 Exception 발생으로 Partition의 Offset Commit 명령을 날리지 않았기 때문에 Consumer를 재기동하면 처리하지 못한 데이터를 모두 읽어들입니다.

반응형