반응형

토픽에 메시지 전송

 - 토픽, 키, 값

프로듀서를 통해서 메시지를 보내는 코드

Properties를 통해서 프로듀서가 사용할 속성을 지정(설정 정보)

이 설정 정보에는 브로커 목록이나, key, value를 직렬화할 때 사용할 serializer, 배치 사이즈 설정등을 Properties 이용해서 지정하게 된다.

이 Properties를 이용해서 kafka 프로듀서 객체를 생성한다.

kafka 프로듀서 객체는 send() 메소드를 제공한다.

send() 메소드에 프로듀서 레코드를 전달하고 바로 이 레코드가 카푸카 브로커에 전송할 메시지가 된다. 

 

Sender의 기본 동작

  • Sender는 별로 쓰레드로 동작한다.
  • 배치가 찼는지 여부에 상관없이 Sender는 차례대로 브로커에 전송한다.
  • send 메소드는 Sender가 메시지를 보내는 동안 배치에 계속 쌓이게 된다.

* send(), Sender 두개가 서로 다른 쓰레드로 동작한다.

* 메시지를 보내는 동안 배치가 쌓이지 않는다거나, 또는 배치가 쌓이는 동안 Sender가 메시지를 보내지 않는다던가 하는 일은 발생하지 않는다.

 

처리량 관련 주요 속성

batch.size - 배치 크기, 배치가 다 차면 바로 전송

linger.ms - 전송 대기 시간(기본값0)

 - 대기 시간이 없으면 배치가 덜 차도 브로커로 바로 전송

 - 대기 시간을 주면 그 시간 만큼 배치에 메시지 추가가 가능해서 한 번의 전송 요청에 더 많은 데이터 처리 가능

 

전송 결과 확인 안함

producer.send(new ProducerRecord<>("simple", "value"));

- 전송 실패를 알 수 없음

- 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용

 

전송 결과 확인함 : Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));

try{

   RecordMetadata meta = f.get(); //블로킹

} catch (ExecutionException ex) {

}

- 배치 효과 떨어짐 -> 처리량 저하

- 처리량이 낮아도 되는 경우에만 사용

- 하나의 메시지를 보내고 블로킹되고 또 하나 보내고 블로킹 된다. 배치에 메시지가 한 개씩만 들어간다.

 

전송 결과 확인함 : Callback 사용

producer.send(new ProducerRecord<>("simple", "value"),

  new Callback() {

     @Override

      public void onCompletion(RecordMetadata metadata, Exception ex) {

      }

});

- 처리량 저하 없음

 

전송 보장과 ack

 

ack = 0

 - 서버 응답을 기다리지 않음

 - 전송 보장도 없음

ack = 1

 - 파티션의 리더에 저장되면 응답 받음

 - 리더 장애시 메시지 유실 가능(팔로워에 저장되기 전)

ack = all (또는 -1)

 - 모든 리플리카에 저장되면 응답 받음

   - 브로커 min.insync.replicas 설정에 따라 달라짐

 

 

 

ack + min.insync.replicas

min.insync.replicas (브로커 옵션)

프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수

 

예1 :

  - 리플리카 개수 3, ack = all, min.insync.replicas = 2

  - 리더에 저장하고 팔로워 중 한 개에 저장하면 성공 응답

예2 :

 - 리플리카 개수 3, ack = all, min.insync.replicas = 1

 - 리더에 저장되면 성공 응답

 - ack = 1 동일 (리더 장애시 메시지 유실 가능) 

예3 :

 - 리플리카 개수 3, ack = all, min.insync.replicas = 3

 - 리더와 팔로워 2개에 저장되면 성공 응답

 - 팔로워 중 한 개라도 장애가 나면 리플리카 부족으로 저장에 실패함(리플리카 개수와 min.insync.replicas 개수 동일하면 안됨)

 

에러 유형

전송 과정에서 실패

 - 전송 타임 아웃(일시적인 네트워크 오류 등)

 - 리더 다운에 의한 새 리더 선출 진행 중

 - 브로커 설정 메시지 크기 한도 초과

 - 등등

 

전송 전에 실패

 - 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과

 - 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과

 - 등등

 

실패 대응 1 : 재시도

재시도 - 재시도 가능한 에러는 재시도 처리  - 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등재시도 위치 - 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도   - retries 속성 - send() 메서드에서 Exception 발생시 Exception 타입에 따라 send() 재호출 - 콜백 메서드에서  Exception 받으면 타입에 따라 send() 재호출

아주 아주 특별한 이유가 없다면 무한 재시도 X

 

실패 대응 2 : 기록

추후 처리 위해 기록

 - 별도 파일, DB 등을 이용해서 실패한 메시지 기록

 - 추후에 수동(또는 자동) 보정 작업 진행

기록 위치

 - send() 메서드에서 익셉션 발생시

 - send() 메서드에 전달한 콜백에서 Exception 받는 경우

 - send() 메서드가 리턴한 Future의 get() 메서드에서 Exception 발생시

 

재시도와 메시지 중복 전송 가능성

브로커 응답이 늦게 와서 재시도할 경우 중복 발생 가능

enable.idempotence 속성 이용하면 중복 전송 될 가능성을 줄일 수 있다.

 

재시도와 순서

max.in.flight.requests.per.connection

 - 블로킹없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수

 - 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음

  - 전송 순서가 중요하면 이 값을 1로 지정

 

 

참고

- kafka 조금 아는 척하기 2 (개발자용) - 프로듀서 (https://www.youtube.com/watch?v=geMtm17ofPY)

 

 

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초3 - 컨슈머  (0) 2021.07.24
Kafka 기초1  (0) 2021.07.21

+ Recent posts