반응형

토픽 파티션에서 레코드 조회

1. 서버 지정

2. group id 지정

3. 메시지를 역직렬화 

4. Kafka 컨슈머 객체 생성

5. consumer.subsribe(Collections.singleton("simple")); 구독할 토픽 목록 전달

6. 특정 조건을 충족하는 동안 루프를 돌면서 컨슈머에 폴 메소드를 호출

7. 폴 메소드는 일정 시간 동안 대기하다가 브로커로부터 컨슈머의 레코드 목록을 읽어온다.

8. 읽어 온 커슈머 레코드를 다시 또 루프를 돌면서 필요한 처리를 수행

9. close() 메소드로 종료처리

 

토픽 파티션은 그룹 단위 할당

컨슈머 그룹 단위로 파티션 할당

예)

파티션이 2개 컨슈머 그룹 1

컨슈머 한개가 두 파티션으로부터 데이터를 읽어 오게 된다.

파티션이 2개 컨슈머 그룹 2

각 컨슈머가 각 파티션으로부터 데이터를 읽어 오게 된다.

파티션이 2개 컨슈머 그룹 3

1개의 컨슈머는 놀게 된다. 컨슈머 개수가 파티션 개수보다 커지면 안 된다.

* 처리량이 떨어져서 컨슈머를 늘려야 한다면 파티션 개수도 같이 늘려야 한다.

 

커밋과 오프셋

각각 메시지가 저장된 위치를 오프셋(offset)이라고 하는데,

컨슈머의 폴 메소드는 이전에 커밋한 오프셋이 있으면 그 오프셋 이후에 레코드를 읽어온다.

그리고 읽어온 다음에 마지막 읽어온 레코드의 오프셋을 커밋을 한다. 

커밋된 오프셋이 없는 경우

1. 처음 접근이거나 커밋한 오프셋이 없는 경우

2. auto.offset.reset 설정 사용

 - earliest : 맨 처름 오프셋 사용

 - latest : 가장 마지막 오프셋 사용(기본값)

 - none : 컨슈머 그룹웨 대한 이전 커밋이 없으면 익셉션 발생 (익셥션 발생하기 때문에 보통 사용 안함)

 

컨슈머 설정

조회에 영향을 주는 주요 설정

fetch.min.bytes : 조회시 브로커가 전송할 최고 데이터 크기

 - 기본값 : 1

 - 이 값이 크면 대기 시간은 늘지만 처리량이 증가

fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간

 - 기본값 : 500

 - 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름

max.partition.fetch.bytest : 파티션 당 서버가 리턴할 수 있는 최대 크기

 - 기본값 : 1048576(1MB) 

 

자동 커밋/수동 커밋

enable.auto.commit 설정

 - true : 이정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값)

 - false : 수동으로 커밋 실행 

auto.commit.interval.ms : 자동 커밋 주기

poll(), close() 메서드 호출시 자동 커밋 실행

 

수동 커밋 : 동기/비동기 커밋

동기
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

for(ConsumerRecord<String, String> record : records) {
    ... 처리
}
try {
  consumer.commitSync();
} catch(Exception ex) {
  // 커밋 실패시 에러 발생
}
비동기
비동기이기 때문에 바로 성공 여부를 알 수 없다.
callback을 통해서 알 수 있다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

for(ConsumerRecord<String, String> record : records) {
  ...처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)

재처리와 순서

동일 메시지 조회 가능성

 - 일시적 커밋 실패, 리밸런스 등에 의해 발생

컨슈머는 멱동성(idempotence)을 고려해야 함

 - 예 : 아래 메시지를 재처리 할 경우

     - 조회수 1증가 -> 좋아요 1증가 -> 조회수 1증가

 - 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음

데이터 특성에 따라 타임스탬프, 일련번호 등을 활용을 해서 데이터를 중복해서 두 번 이상 처리해도 문제가 없도록 고려해야 함

 

세션 타임아웃, 하트비트, 최대 poll 간격

Kafka는 컨슈머 그룹을 알맞게 유지하기 위해서 설정을 사용한다.

1. 컨슈머는 하트비트를 전송해서 연결 유지

 - 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 - 관련 설정

    - session.timeout.ms : 세션 타임 아웃 시간 (기본값 10초)

    - heartbeat.interval.ms : 하트비트 전송 주기 (기본값 3초)

       - session.timeout.ms의 1/3 이하 추천

2. max.poll.interval.ms : poll() 메서드의 최대 호출 간격

 - 이 시간이 지나도록 poll()하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 

종료처리

다른 쓰레드에서 wakeup() 메서드 호출

 - poll() 메서드가 WakeupException 발생 -> close() 메서드로 종료 처리

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

consumer.subsribe(Collections.singleton("simple"));

try {

  while(true) {

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 Exception 발생

   ... records 처리

 try {

   consumer.commitAsync();

 } catch(Exception e) {

   e.printStackTrace();

 }

}

} catch (Exception ex) {

 ...

} finally {

  consumer.close();

}

주의 : 쓰레드 안전하지 않음

KafkaConsumer는 쓰레드에 안전하지 않음

 - 여러 쓰레드에서 동시에 사용하지 말 것

 - wakeup() 메서드는 예외

 

 

참고 

- kafka 조금 아는 척하기 3 (개발자용)-컨슈머 (https://www.youtube.com/watch?v=xqrIDHbGjOY)

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초2 - 프로듀서  (0) 2021.07.22
Kafka 기초1  (0) 2021.07.21
반응형

토픽에 메시지 전송

 - 토픽, 키, 값

프로듀서를 통해서 메시지를 보내는 코드

Properties를 통해서 프로듀서가 사용할 속성을 지정(설정 정보)

이 설정 정보에는 브로커 목록이나, key, value를 직렬화할 때 사용할 serializer, 배치 사이즈 설정등을 Properties 이용해서 지정하게 된다.

이 Properties를 이용해서 kafka 프로듀서 객체를 생성한다.

kafka 프로듀서 객체는 send() 메소드를 제공한다.

send() 메소드에 프로듀서 레코드를 전달하고 바로 이 레코드가 카푸카 브로커에 전송할 메시지가 된다. 

 

Sender의 기본 동작

  • Sender는 별로 쓰레드로 동작한다.
  • 배치가 찼는지 여부에 상관없이 Sender는 차례대로 브로커에 전송한다.
  • send 메소드는 Sender가 메시지를 보내는 동안 배치에 계속 쌓이게 된다.

* send(), Sender 두개가 서로 다른 쓰레드로 동작한다.

* 메시지를 보내는 동안 배치가 쌓이지 않는다거나, 또는 배치가 쌓이는 동안 Sender가 메시지를 보내지 않는다던가 하는 일은 발생하지 않는다.

 

처리량 관련 주요 속성

batch.size - 배치 크기, 배치가 다 차면 바로 전송

linger.ms - 전송 대기 시간(기본값0)

 - 대기 시간이 없으면 배치가 덜 차도 브로커로 바로 전송

 - 대기 시간을 주면 그 시간 만큼 배치에 메시지 추가가 가능해서 한 번의 전송 요청에 더 많은 데이터 처리 가능

 

전송 결과 확인 안함

producer.send(new ProducerRecord<>("simple", "value"));

- 전송 실패를 알 수 없음

- 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용

 

전송 결과 확인함 : Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));

try{

   RecordMetadata meta = f.get(); //블로킹

} catch (ExecutionException ex) {

}

- 배치 효과 떨어짐 -> 처리량 저하

- 처리량이 낮아도 되는 경우에만 사용

- 하나의 메시지를 보내고 블로킹되고 또 하나 보내고 블로킹 된다. 배치에 메시지가 한 개씩만 들어간다.

 

전송 결과 확인함 : Callback 사용

producer.send(new ProducerRecord<>("simple", "value"),

  new Callback() {

     @Override

      public void onCompletion(RecordMetadata metadata, Exception ex) {

      }

});

- 처리량 저하 없음

 

전송 보장과 ack

 

ack = 0

 - 서버 응답을 기다리지 않음

 - 전송 보장도 없음

ack = 1

 - 파티션의 리더에 저장되면 응답 받음

 - 리더 장애시 메시지 유실 가능(팔로워에 저장되기 전)

ack = all (또는 -1)

 - 모든 리플리카에 저장되면 응답 받음

   - 브로커 min.insync.replicas 설정에 따라 달라짐

 

 

 

ack + min.insync.replicas

min.insync.replicas (브로커 옵션)

프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수

 

예1 :

  - 리플리카 개수 3, ack = all, min.insync.replicas = 2

  - 리더에 저장하고 팔로워 중 한 개에 저장하면 성공 응답

예2 :

 - 리플리카 개수 3, ack = all, min.insync.replicas = 1

 - 리더에 저장되면 성공 응답

 - ack = 1 동일 (리더 장애시 메시지 유실 가능) 

예3 :

 - 리플리카 개수 3, ack = all, min.insync.replicas = 3

 - 리더와 팔로워 2개에 저장되면 성공 응답

 - 팔로워 중 한 개라도 장애가 나면 리플리카 부족으로 저장에 실패함(리플리카 개수와 min.insync.replicas 개수 동일하면 안됨)

 

에러 유형

전송 과정에서 실패

 - 전송 타임 아웃(일시적인 네트워크 오류 등)

 - 리더 다운에 의한 새 리더 선출 진행 중

 - 브로커 설정 메시지 크기 한도 초과

 - 등등

 

전송 전에 실패

 - 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과

 - 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과

 - 등등

 

실패 대응 1 : 재시도

재시도 - 재시도 가능한 에러는 재시도 처리  - 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등재시도 위치 - 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도   - retries 속성 - send() 메서드에서 Exception 발생시 Exception 타입에 따라 send() 재호출 - 콜백 메서드에서  Exception 받으면 타입에 따라 send() 재호출

아주 아주 특별한 이유가 없다면 무한 재시도 X

 

실패 대응 2 : 기록

추후 처리 위해 기록

 - 별도 파일, DB 등을 이용해서 실패한 메시지 기록

 - 추후에 수동(또는 자동) 보정 작업 진행

기록 위치

 - send() 메서드에서 익셉션 발생시

 - send() 메서드에 전달한 콜백에서 Exception 받는 경우

 - send() 메서드가 리턴한 Future의 get() 메서드에서 Exception 발생시

 

재시도와 메시지 중복 전송 가능성

브로커 응답이 늦게 와서 재시도할 경우 중복 발생 가능

enable.idempotence 속성 이용하면 중복 전송 될 가능성을 줄일 수 있다.

 

재시도와 순서

max.in.flight.requests.per.connection

 - 블로킹없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수

 - 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음

  - 전송 순서가 중요하면 이 값을 1로 지정

 

 

참고

- kafka 조금 아는 척하기 2 (개발자용) - 프로듀서 (https://www.youtube.com/watch?v=geMtm17ofPY)

 

 

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초3 - 컨슈머  (0) 2021.07.24
Kafka 기초1  (0) 2021.07.21
반응형

카프카란?

APACHE KAFKA

More than 80% of all Fortune 100 companies trust, and use Kafka.

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

 

Fortune 100대 기업 중 80% 이상이 Kafka를 신뢰하고 사용하고 있다.

Apache Kafa는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼이다.

- https://kafka.apache.org/

 

기본 구조

  • 카프카 클러스터 - 메시지를 저장하는 저장소
  • 브로커 - 각각의 서버(메시지를 나눠서 저장, 이중화 처리, 장애 대체)
  • 주키퍼 클러스터(앙상블) - 카프카 클러스터 관리 용도(카프카 클러스터에 대한 정보가 관리가 되고 기록이 됨)
  • 프로듀서 - 카프카 플러스터에 메시지를 보낸다. (데이터를 이동하는데 필요한 핵심 역할)
  • 컨슈머 - 카프카에서 메시지를 읽어낸다. (데이터를 이동하는데 필요한 핵심 역할)

토픽

- 카프카에서 메시지를 저장하는 단위

- 토픽은 메시지를 구분하는 용도로 사용

- 파일 시스템의 폴더나 메일함과 유사함

- 한 개의 토픽은 한 개 이상의 파티션으로 구성   * 파티션은 메시지를 저장하는 물리적인 파일을 의미

- 프로듀서와 컨슈머가 토픽을 기준으로 메시지를 주고받는다.

- 예) 뉴스용 토픽, 주문용 토픽

 

파티션

- 파티션은 추가만 가능한(append-only) 파일

- 각각 메시지가 저장된 위치를 오프셋(offset)이라고 한다.

- 프로듀서가 메시지를 저장을 하게되면  offset0, offset1 ... offset7 이런식으로 값을 가지게 된다.(파티션 맨 뒤에 추가된다.)

- 컨슈머는 오프셋 기준으로 순서대로 읽는다. (이후 메시지만 읽을 수 있다. 전에 메시지는 읽지 못한다.)

- 메시지는 삭제되지 않는다.

 

여러 파티션과 프로듀서

토픽은 여러 파티션으로 구성될 수 있는데 프로듀서는 어떤 파티션에 메시지를 저장을 할까? 

라운드로빈 또는 키를 이용해서 파티션을 선택한다. (같은 키에 대해서는 같은 파티션에 저장이 된다.  말인즉슨 같은 키는 순서가 유지된다.)

 

여러 파티션과 컨슈머

 - 컨슈머는 컨슈머그룹에 속하게 된다.

 - 한 개 파티션은 컨슈머그룹의 한 개 컨슈머만 연결이 가능하다.

     - 즉 컨슈머그룹에 속한 컨슈머들은 한 파티션을 공유할 수 없다.

     - 한 컨슈머그룹 기준으로 파티션의 메시지는 순서대로 처리된다.

     - 한개의 파티션을 서로 다른 그룹의 컨슈머는 공유할 수 있다.

성능이 좋은 이유

1. 파티션 파일을 OS 페이지캐시를 사용한다.

 - 파티션에 대한 파일 IO를 메모리에서 처리한다.

 - 서버에서 페이지캐시를 카프카만 사용해야 성능에 유리

2. Zero Copy

 - 디스크 버퍼에서 네트워크 버퍼로 직접 데이터 복사하기 때문에 빨라진다.

3. 브로커가 하는 일이 비교적 단순

 - 메시지 필터, 메시지 재전송과 같은 일은 브로커가 하지 않음(프로듀서, 컨슈머가 한다.)

 - 브로커는 컨슈머와 파티션 간 매핑 관리

4. 묶어서 보내고, 묶어서 받는다.

 - 프로듀서 : 일정 크기만큼 메시지를 모아서 전송 가능하다.

 - 컨슈머 : 최소 크기만큼 메시지를 모아서 조회 가능하다.

 - 낱개 처리보다 처리량 증가

5. 처리량 증대(확장)가 쉽다.

 - 1개 장비의 용량 한계 -> 브로커 추가, 파티션 추가

 - 컨슈머가 느림 -> 컨슈머 추가( +파티션 추가)

 

리플리카 - 복제

리플리카 : 파티션의 복제본

 - 복제수만큼 파티션의 복제본이 각 브로커에 생김

리더와 팔로워로 구성

 - 프로듀서와 컨슈머는 리더를 통해서만 메시지 처리

 - 팔로워는 리더로부터 복제

장애 대응

 - 리더가 속한 브로커 장애시 다른 팔로워가 리더가 됨

 

 

 

참고 

- kafka 조금 아는 척하기 1 (개발자용) (https://www.youtube.com/watch?v=0Ssx7jJJADI)

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초3 - 컨슈머  (0) 2021.07.24
Kafka 기초2 - 프로듀서  (0) 2021.07.22

+ Recent posts