Study/message-queue
Spring-Kafka Consumer Offset 관리
에디개발자
2021. 10. 14. 06:00
반응형
들어가기 전에
Consumer를 AutoCommit을 false로 설정하고 사용하고 있습니다. Consumer에서 로직을 완료하고 partition에 commit을 처리하도록 작성하였습니다. 그런데 로직을 처리하던 중 Exception이 발생했고 partition에 commit을 하지 못했습니다. 그렇다면 이 Consumer는 무한 루프에 빠져 데이터를 계속 읽고 Exception을 계속 발생시킬까요?
정답은 NO! 입니다. Consumer 내부에서 Partition Offset과는 별개로 Consumer만을 위한 Offset을 관리하고 있기 때문입니다.
Consumer Polling 방식
- Consumer는 KafkaMessageListenerContainer에서 새로운 Thread를 생성하여 while문을 이용해 무한 루프를 돌립니다. ( Consumer LifeCycle을 참조 )
- KafkaMessageLisenterContainer에서 Consumer 객체의 poll() 메서드를 호출합니다.
- Consumer는 Interface이고 구현체로는 KafkaConsumer를 Default로 사용하고 있습니다.
- KafkaConsumer에서 Fetcher 객체의 fetchedRecords() 메서드를 호출하고 있습니다.
- Fetcher 객체 내부에서 CompletedFetch 객체의 fetchedRecords() 메서드를 호출하여 Record를 조회합니다.
Consumer 내부 Offset 관리
Fetcher 클래스 내부의 CompletedFetch 클래스가 존재합니다. 이 CompletedFetch 클래스의 nextFetchOffset 변수를 통해 Offset을 관리하고 있습니다.
private class CompletedFetch {
private final TopicPartition partition;
private final Iterator<? extends RecordBatch> batches;
private final Set<Long> abortedProducerIds;
private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions;
private final FetchResponse.PartitionData<Records> partitionData;
private final FetchResponseMetricAggregator metricAggregator;
private final short responseVersion;
private int recordsRead;
private int bytesRead;
private RecordBatch currentBatch;
private Record lastRecord;
private CloseableIterator<Record> records;
private long nextFetchOffset; // offset 관리
private Optional<Integer> lastEpoch;
private boolean isConsumed = false;
private Exception cachedRecordException = null;
private boolean corruptLastRecord = false;
private boolean initialized = false;
// ....
}
Consumer는 먼저 Partition의 offset을 조회한 후 nextFetchOffset과 비교하여 같다면 데이터를 읽어들입니다.
결론
이러한 방법 덕분에 Consumer에서는 데이터를 조회한 후 Exception이 발생했더라도 같은 데이터를 다시 읽어오지 않습니다. 당연하지만 Exception 발생으로 Partition의 Offset Commit 명령을 날리지 않았기 때문에 Consumer를 재기동하면 처리하지 못한 데이터를 모두 읽어들입니다.
반응형