토픽 파티션에서 레코드 조회
1. 서버 지정
2. group id 지정
3. 메시지를 역직렬화
4. Kafka 컨슈머 객체 생성
5. consumer.subsribe(Collections.singleton("simple")); 구독할 토픽 목록 전달
6. 특정 조건을 충족하는 동안 루프를 돌면서 컨슈머에 폴 메소드를 호출
7. 폴 메소드는 일정 시간 동안 대기하다가 브로커로부터 컨슈머의 레코드 목록을 읽어온다.
8. 읽어 온 커슈머 레코드를 다시 또 루프를 돌면서 필요한 처리를 수행
9. close() 메소드로 종료처리
토픽 파티션은 그룹 단위 할당
컨슈머 그룹 단위로 파티션 할당
예)
파티션이 2개 컨슈머 그룹 1개
컨슈머 한개가 두 파티션으로부터 데이터를 읽어 오게 된다.
파티션이 2개 컨슈머 그룹 2개
각 컨슈머가 각 파티션으로부터 데이터를 읽어 오게 된다.
파티션이 2개 컨슈머 그룹 3개
1개의 컨슈머는 놀게 된다. 컨슈머 개수가 파티션 개수보다 커지면 안 된다.
* 처리량이 떨어져서 컨슈머를 늘려야 한다면 파티션 개수도 같이 늘려야 한다.
커밋과 오프셋
각각 메시지가 저장된 위치를 오프셋(offset)이라고 하는데,
컨슈머의 폴 메소드는 이전에 커밋한 오프셋이 있으면 그 오프셋 이후에 레코드를 읽어온다.
그리고 읽어온 다음에 마지막 읽어온 레코드의 오프셋을 커밋을 한다.
커밋된 오프셋이 없는 경우
1. 처음 접근이거나 커밋한 오프셋이 없는 경우
2. auto.offset.reset 설정 사용
- earliest : 맨 처름 오프셋 사용
- latest : 가장 마지막 오프셋 사용(기본값)
- none : 컨슈머 그룹웨 대한 이전 커밋이 없으면 익셉션 발생 (익셥션 발생하기 때문에 보통 사용 안함)
컨슈머 설정
조회에 영향을 주는 주요 설정
fetch.min.bytes : 조회시 브로커가 전송할 최고 데이터 크기
- 기본값 : 1
- 이 값이 크면 대기 시간은 늘지만 처리량이 증가
fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간
- 기본값 : 500
- 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름
max.partition.fetch.bytest : 파티션 당 서버가 리턴할 수 있는 최대 크기
- 기본값 : 1048576(1MB)
자동 커밋/수동 커밋
enable.auto.commit 설정
- true : 이정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값)
- false : 수동으로 커밋 실행
auto.commit.interval.ms : 자동 커밋 주기
poll(), close() 메서드 호출시 자동 커밋 실행
수동 커밋 : 동기/비동기 커밋
동기 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String, String> record : records) { ... 처리 } try { consumer.commitSync(); } catch(Exception ex) { // 커밋 실패시 에러 발생 } |
비동기 비동기이기 때문에 바로 성공 여부를 알 수 없다. callback을 통해서 알 수 있다. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String, String> record : records) { ...처리 } consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback) |
재처리와 순서
동일 메시지 조회 가능성
- 일시적 커밋 실패, 리밸런스 등에 의해 발생
컨슈머는 멱동성(idempotence)을 고려해야 함
- 예 : 아래 메시지를 재처리 할 경우
- 조회수 1증가 -> 좋아요 1증가 -> 조회수 1증가
- 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음
데이터 특성에 따라 타임스탬프, 일련번호 등을 활용을 해서 데이터를 중복해서 두 번 이상 처리해도 문제가 없도록 고려해야 함
세션 타임아웃, 하트비트, 최대 poll 간격
Kafka는 컨슈머 그룹을 알맞게 유지하기 위해서 설정을 사용한다.
1. 컨슈머는 하트비트를 전송해서 연결 유지
- 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행
- 관련 설정
- session.timeout.ms : 세션 타임 아웃 시간 (기본값 10초)
- heartbeat.interval.ms : 하트비트 전송 주기 (기본값 3초)
- session.timeout.ms의 1/3 이하 추천
2. max.poll.interval.ms : poll() 메서드의 최대 호출 간격
- 이 시간이 지나도록 poll()하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행
종료처리
다른 쓰레드에서 wakeup() 메서드 호출
- poll() 메서드가 WakeupException 발생 -> close() 메서드로 종료 처리
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subsribe(Collections.singleton("simple"));
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 Exception 발생
... records 처리
try {
consumer.commitAsync();
} catch(Exception e) {
e.printStackTrace();
}
}
} catch (Exception ex) {
...
} finally {
consumer.close();
}
주의 : 쓰레드 안전하지 않음
KafkaConsumer는 쓰레드에 안전하지 않음
- 여러 쓰레드에서 동시에 사용하지 말 것
- wakeup() 메서드는 예외
참고
- kafka 조금 아는 척하기 3 (개발자용)-컨슈머 (https://www.youtube.com/watch?v=xqrIDHbGjOY)
'Kafka' 카테고리의 다른 글
Kafka 기초2 - 프로듀서 (0) | 2021.07.22 |
---|---|
Kafka 기초1 (0) | 2021.07.21 |