반응형

인터넷 통신

 - 크게 클라이언트서버로 구성되어 있는데, 그 사이에 수많은 노드 서버들이 존재하여 노드들을 거쳐서 통신하게 된다.

IP

인터넷 프로톸콜 역할

 - 지정한 IP 주소(IP Address)에 데이터 전달

 - 패킷이라는 통신 단위로 데이터 전달

 

* IP 패킷 정보 데이터(출발지 IP, 목적지IP, 기타...)

IP 프로토콜의 한계

비연결성

 - 패킷을 받을 대상이 없거나 서비스 불능 상태여도 패킷 전송된다.

비신뢰성

 - 중간에 패킷이 사라질수 있다.

 - 패킷이 순서대로 도착 안 할 수 있다.

프로그램 구분

 - 같은 IP를 사용하는 서버에서 통신하는 애플리케이션이 둘 이상이면 어떻게 구분할 것인가?

 

 

* IP 프로토콜의 한계를 극복하고자 TCP UDP 프로토콜 등장

 

인터넷 프로토콜 스택의 4계층

  1. 애플리케이션 계층 - HTTP, FTP
  2. 전송 계층 - TCP, UDP
  3. 인터넷 계층 - IP
  4. 네트워크 인터페이스 계층

TCP 프로토콜

* TCP 세그먼트 정보(출발지 PROT, 목적지 PORT, 전송 제어, 순서, 검증 정보 ...)

 

TCP 특징

전송 제어 프로토콜(Transmission Control Protocol)

  • 연결 지향 - TCP 3 way handshake (가상 연결)
  • 데이터 전달 보증
  • 순서 보장

 - 신뢰할 수 있는 프로토콜

 - 현재는 대부분 TCP 사용

 

 

TCP 3 way handshake

 - IP 프로토콜 비연결성 해결

데이터 전달 보증

 - IP 프로토콜 비신뢰성 해결

순서 보장

 - IP 프로토콜 순서 비신뢰성 해결

 

UDP 특징

  • 기능이 거의 없음
  • 데이터 전달 및 순서가 보장되지 않지만, 단순하고 빠름
  • IP 프로토콜과 거의 같다 + PORT + 체크섬 정도 추가
  • 애플리케이션 추가 작업 필요
  • 요새 TCP의 SYN, ACK 과정들을 줄이기 위해서 다시 관심받고 있다.

 

PORT

 - 같은 IP 내에서 프로세스 구분

 - IP 프로토콜 프로그램 구분 해결

 - 예) IP가 아파트 한동이라면 PORT는 호수

 

* IP는 기억하기 어렵고, 바뀔 가능성이 있다. 그래서 DNS 등장

DNS

도메인 네임 시스템(Domain Name System)

- 도메인 명을 IP 주소로 변환

 

 

 

 

참고

inflean 강의 (모든 개발자를 위한 HTTP 웹 기본 지식, 김영한)

728x90
반응형

'Network' 카테고리의 다른 글

URI, URL, URN  (0) 2021.09.06
반응형

1:N인 관계를 양방향 연관관계로 구현해서 Postman으로 API 테스트를 진행하는데 놀랍게도 StackOverFlowError가 발생하였다.

그 유명한 StackOverFlow를 처음으로 맞이해서 매우 반갑긴 하였지만, 해결할 생각에 막막하기만 했다.

열심히 구글링한 결과 JPA 순환 참조라는 것을 알았다.

 

Employee.java

 

public class Employee {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "employee_id")
private Long id;

private String employeeName;

private String phoneNumber;

@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "shop_id")
private Shop shop;

}

 

 

Shop.java

public class Shop {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "shop_id")
private Long id;

private String shopName;

private String phoneNumber;

@OneToMany(mappedBy = "shop", cascade = CascadeType.ALL)
private List employees = new ArrayList<>();


}

원인은 Controller를 통해서 Shop Entity를 Response로 내보내고 브라우저에 json 형태로 뿌려주기 위해서는 Shop entity가 참조하고 있는 Employee Entity도 함께 불러오게 된다. 여기서 순환 참조가 발생한다!

Employee Entity도 Shop Entity를 참조하기 때문이다.

 

해결방법

1. @JsonManagedReference, @JsonBackReference 애노테이션 사용

  - 순환참조를 방어하기 위한 Annotation이다. 부모 클래스에 @JsonManagedReference 자식 클래스에 @JsonBackReference 애노테이션을 붙여준다.

  예) 1:N관계에서 1인 Entity에 @JsonManagedReference N인 Entity에 @JsonBackReference 애노테이션 붙인다.

   

Employee.java

public class Employee {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "employee_id")
private Long id;

private String employeeName;

private String phoneNumber;

@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "shop_id")
@JsonBackReference
private Shop shop;

}

 

Shop.java

public class Shop {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "shop_id")
private Long id;

private String shopName;

private String phoneNumber;

@OneToMany(mappedBy = "shop", cascade = CascadeType.ALL)
@JsonManagedReference
private List employees = new ArrayList<>();


}

 

2. @JsonIgnore 

  - json 데이터에 해당 프로퍼티는 null로 들어가게 된다. 해당 데이터를 아예 포함이 안되게 하는 것이다.

 

 

3. DTO 사용

 - 해당 문제는 Entity 자체를 리턴하는 데에서 문제가 발생한 것이다. Entity 자체를 리턴하는 것보다 ResponseDto를 생성해서 필요한 데이터만 리턴하는 게 더 좋은 설계라고 생각된다. Entity는 그 자체로 두고 최대한 건드리지 않는 것이 좋다. 많은 애노테이션(제약조건 애노테이션 등)을 Entity Class에 넣다 보니 Entity Class가 너무 복잡해진다. 이번 계기로 리턴할 경우 ResponseDto를 만들고 url 파라미터를 받는 경우에도 FormDataDto 등을 생성해서 Entity에는 최대한 DB 관련 애노테이션만 넣을 수 있도록 해야겠다.

 

4. 단방향, 양방향 매핑 고민

 - 아무 고민 없이 양방향 매핑으로 구현을 하였지만, 정말로 양방향으로 매핑이 필요한지 고민하고 단방향 매핑을 이용하는 것도 해당 문제를 해결하는 데 방법이다. 

728x90
반응형

'JPA' 카테고리의 다른 글

JPA 연관관계 매핑 기초  (0) 2021.06.17
JPA 엔티티 매핑  (0) 2021.06.08
JPA 영속성 관리  (0) 2021.06.06
JPA 소개  (0) 2021.06.04
반응형

토픽 파티션에서 레코드 조회

1. 서버 지정

2. group id 지정

3. 메시지를 역직렬화 

4. Kafka 컨슈머 객체 생성

5. consumer.subsribe(Collections.singleton("simple")); 구독할 토픽 목록 전달

6. 특정 조건을 충족하는 동안 루프를 돌면서 컨슈머에 폴 메소드를 호출

7. 폴 메소드는 일정 시간 동안 대기하다가 브로커로부터 컨슈머의 레코드 목록을 읽어온다.

8. 읽어 온 커슈머 레코드를 다시 또 루프를 돌면서 필요한 처리를 수행

9. close() 메소드로 종료처리

 

토픽 파티션은 그룹 단위 할당

컨슈머 그룹 단위로 파티션 할당

예)

파티션이 2개 컨슈머 그룹 1

컨슈머 한개가 두 파티션으로부터 데이터를 읽어 오게 된다.

파티션이 2개 컨슈머 그룹 2

각 컨슈머가 각 파티션으로부터 데이터를 읽어 오게 된다.

파티션이 2개 컨슈머 그룹 3

1개의 컨슈머는 놀게 된다. 컨슈머 개수가 파티션 개수보다 커지면 안 된다.

* 처리량이 떨어져서 컨슈머를 늘려야 한다면 파티션 개수도 같이 늘려야 한다.

 

커밋과 오프셋

각각 메시지가 저장된 위치를 오프셋(offset)이라고 하는데,

컨슈머의 폴 메소드는 이전에 커밋한 오프셋이 있으면 그 오프셋 이후에 레코드를 읽어온다.

그리고 읽어온 다음에 마지막 읽어온 레코드의 오프셋을 커밋을 한다. 

커밋된 오프셋이 없는 경우

1. 처음 접근이거나 커밋한 오프셋이 없는 경우

2. auto.offset.reset 설정 사용

 - earliest : 맨 처름 오프셋 사용

 - latest : 가장 마지막 오프셋 사용(기본값)

 - none : 컨슈머 그룹웨 대한 이전 커밋이 없으면 익셉션 발생 (익셥션 발생하기 때문에 보통 사용 안함)

 

컨슈머 설정

조회에 영향을 주는 주요 설정

fetch.min.bytes : 조회시 브로커가 전송할 최고 데이터 크기

 - 기본값 : 1

 - 이 값이 크면 대기 시간은 늘지만 처리량이 증가

fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간

 - 기본값 : 500

 - 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름

max.partition.fetch.bytest : 파티션 당 서버가 리턴할 수 있는 최대 크기

 - 기본값 : 1048576(1MB) 

 

자동 커밋/수동 커밋

enable.auto.commit 설정

 - true : 이정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값)

 - false : 수동으로 커밋 실행 

auto.commit.interval.ms : 자동 커밋 주기

poll(), close() 메서드 호출시 자동 커밋 실행

 

수동 커밋 : 동기/비동기 커밋

동기
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

for(ConsumerRecord<String, String> record : records) {
    ... 처리
}
try {
  consumer.commitSync();
} catch(Exception ex) {
  // 커밋 실패시 에러 발생
}
비동기
비동기이기 때문에 바로 성공 여부를 알 수 없다.
callback을 통해서 알 수 있다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

for(ConsumerRecord<String, String> record : records) {
  ...처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)

재처리와 순서

동일 메시지 조회 가능성

 - 일시적 커밋 실패, 리밸런스 등에 의해 발생

컨슈머는 멱동성(idempotence)을 고려해야 함

 - 예 : 아래 메시지를 재처리 할 경우

     - 조회수 1증가 -> 좋아요 1증가 -> 조회수 1증가

 - 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음

데이터 특성에 따라 타임스탬프, 일련번호 등을 활용을 해서 데이터를 중복해서 두 번 이상 처리해도 문제가 없도록 고려해야 함

 

세션 타임아웃, 하트비트, 최대 poll 간격

Kafka는 컨슈머 그룹을 알맞게 유지하기 위해서 설정을 사용한다.

1. 컨슈머는 하트비트를 전송해서 연결 유지

 - 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 - 관련 설정

    - session.timeout.ms : 세션 타임 아웃 시간 (기본값 10초)

    - heartbeat.interval.ms : 하트비트 전송 주기 (기본값 3초)

       - session.timeout.ms의 1/3 이하 추천

2. max.poll.interval.ms : poll() 메서드의 최대 호출 간격

 - 이 시간이 지나도록 poll()하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행

 

종료처리

다른 쓰레드에서 wakeup() 메서드 호출

 - poll() 메서드가 WakeupException 발생 -> close() 메서드로 종료 처리

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

consumer.subsribe(Collections.singleton("simple"));

try {

  while(true) {

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 Exception 발생

   ... records 처리

 try {

   consumer.commitAsync();

 } catch(Exception e) {

   e.printStackTrace();

 }

}

} catch (Exception ex) {

 ...

} finally {

  consumer.close();

}

주의 : 쓰레드 안전하지 않음

KafkaConsumer는 쓰레드에 안전하지 않음

 - 여러 쓰레드에서 동시에 사용하지 말 것

 - wakeup() 메서드는 예외

 

 

참고 

- kafka 조금 아는 척하기 3 (개발자용)-컨슈머 (https://www.youtube.com/watch?v=xqrIDHbGjOY)

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초2 - 프로듀서  (0) 2021.07.22
Kafka 기초1  (0) 2021.07.21
반응형

토픽에 메시지 전송

 - 토픽, 키, 값

프로듀서를 통해서 메시지를 보내는 코드

Properties를 통해서 프로듀서가 사용할 속성을 지정(설정 정보)

이 설정 정보에는 브로커 목록이나, key, value를 직렬화할 때 사용할 serializer, 배치 사이즈 설정등을 Properties 이용해서 지정하게 된다.

이 Properties를 이용해서 kafka 프로듀서 객체를 생성한다.

kafka 프로듀서 객체는 send() 메소드를 제공한다.

send() 메소드에 프로듀서 레코드를 전달하고 바로 이 레코드가 카푸카 브로커에 전송할 메시지가 된다. 

 

Sender의 기본 동작

  • Sender는 별로 쓰레드로 동작한다.
  • 배치가 찼는지 여부에 상관없이 Sender는 차례대로 브로커에 전송한다.
  • send 메소드는 Sender가 메시지를 보내는 동안 배치에 계속 쌓이게 된다.

* send(), Sender 두개가 서로 다른 쓰레드로 동작한다.

* 메시지를 보내는 동안 배치가 쌓이지 않는다거나, 또는 배치가 쌓이는 동안 Sender가 메시지를 보내지 않는다던가 하는 일은 발생하지 않는다.

 

처리량 관련 주요 속성

batch.size - 배치 크기, 배치가 다 차면 바로 전송

linger.ms - 전송 대기 시간(기본값0)

 - 대기 시간이 없으면 배치가 덜 차도 브로커로 바로 전송

 - 대기 시간을 주면 그 시간 만큼 배치에 메시지 추가가 가능해서 한 번의 전송 요청에 더 많은 데이터 처리 가능

 

전송 결과 확인 안함

producer.send(new ProducerRecord<>("simple", "value"));

- 전송 실패를 알 수 없음

- 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용

 

전송 결과 확인함 : Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));

try{

   RecordMetadata meta = f.get(); //블로킹

} catch (ExecutionException ex) {

}

- 배치 효과 떨어짐 -> 처리량 저하

- 처리량이 낮아도 되는 경우에만 사용

- 하나의 메시지를 보내고 블로킹되고 또 하나 보내고 블로킹 된다. 배치에 메시지가 한 개씩만 들어간다.

 

전송 결과 확인함 : Callback 사용

producer.send(new ProducerRecord<>("simple", "value"),

  new Callback() {

     @Override

      public void onCompletion(RecordMetadata metadata, Exception ex) {

      }

});

- 처리량 저하 없음

 

전송 보장과 ack

 

ack = 0

 - 서버 응답을 기다리지 않음

 - 전송 보장도 없음

ack = 1

 - 파티션의 리더에 저장되면 응답 받음

 - 리더 장애시 메시지 유실 가능(팔로워에 저장되기 전)

ack = all (또는 -1)

 - 모든 리플리카에 저장되면 응답 받음

   - 브로커 min.insync.replicas 설정에 따라 달라짐

 

 

 

ack + min.insync.replicas

min.insync.replicas (브로커 옵션)

프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수

 

예1 :

  - 리플리카 개수 3, ack = all, min.insync.replicas = 2

  - 리더에 저장하고 팔로워 중 한 개에 저장하면 성공 응답

예2 :

 - 리플리카 개수 3, ack = all, min.insync.replicas = 1

 - 리더에 저장되면 성공 응답

 - ack = 1 동일 (리더 장애시 메시지 유실 가능) 

예3 :

 - 리플리카 개수 3, ack = all, min.insync.replicas = 3

 - 리더와 팔로워 2개에 저장되면 성공 응답

 - 팔로워 중 한 개라도 장애가 나면 리플리카 부족으로 저장에 실패함(리플리카 개수와 min.insync.replicas 개수 동일하면 안됨)

 

에러 유형

전송 과정에서 실패

 - 전송 타임 아웃(일시적인 네트워크 오류 등)

 - 리더 다운에 의한 새 리더 선출 진행 중

 - 브로커 설정 메시지 크기 한도 초과

 - 등등

 

전송 전에 실패

 - 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과

 - 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과

 - 등등

 

실패 대응 1 : 재시도

재시도 - 재시도 가능한 에러는 재시도 처리  - 예: 브로커 응답 타임 아웃, 일시적인 리더 없음 등재시도 위치 - 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도   - retries 속성 - send() 메서드에서 Exception 발생시 Exception 타입에 따라 send() 재호출 - 콜백 메서드에서  Exception 받으면 타입에 따라 send() 재호출

아주 아주 특별한 이유가 없다면 무한 재시도 X

 

실패 대응 2 : 기록

추후 처리 위해 기록

 - 별도 파일, DB 등을 이용해서 실패한 메시지 기록

 - 추후에 수동(또는 자동) 보정 작업 진행

기록 위치

 - send() 메서드에서 익셉션 발생시

 - send() 메서드에 전달한 콜백에서 Exception 받는 경우

 - send() 메서드가 리턴한 Future의 get() 메서드에서 Exception 발생시

 

재시도와 메시지 중복 전송 가능성

브로커 응답이 늦게 와서 재시도할 경우 중복 발생 가능

enable.idempotence 속성 이용하면 중복 전송 될 가능성을 줄일 수 있다.

 

재시도와 순서

max.in.flight.requests.per.connection

 - 블로킹없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수

 - 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음

  - 전송 순서가 중요하면 이 값을 1로 지정

 

 

참고

- kafka 조금 아는 척하기 2 (개발자용) - 프로듀서 (https://www.youtube.com/watch?v=geMtm17ofPY)

 

 

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초3 - 컨슈머  (0) 2021.07.24
Kafka 기초1  (0) 2021.07.21
반응형

카프카란?

APACHE KAFKA

More than 80% of all Fortune 100 companies trust, and use Kafka.

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

 

Fortune 100대 기업 중 80% 이상이 Kafka를 신뢰하고 사용하고 있다.

Apache Kafa는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼이다.

- https://kafka.apache.org/

 

기본 구조

  • 카프카 클러스터 - 메시지를 저장하는 저장소
  • 브로커 - 각각의 서버(메시지를 나눠서 저장, 이중화 처리, 장애 대체)
  • 주키퍼 클러스터(앙상블) - 카프카 클러스터 관리 용도(카프카 클러스터에 대한 정보가 관리가 되고 기록이 됨)
  • 프로듀서 - 카프카 플러스터에 메시지를 보낸다. (데이터를 이동하는데 필요한 핵심 역할)
  • 컨슈머 - 카프카에서 메시지를 읽어낸다. (데이터를 이동하는데 필요한 핵심 역할)

토픽

- 카프카에서 메시지를 저장하는 단위

- 토픽은 메시지를 구분하는 용도로 사용

- 파일 시스템의 폴더나 메일함과 유사함

- 한 개의 토픽은 한 개 이상의 파티션으로 구성   * 파티션은 메시지를 저장하는 물리적인 파일을 의미

- 프로듀서와 컨슈머가 토픽을 기준으로 메시지를 주고받는다.

- 예) 뉴스용 토픽, 주문용 토픽

 

파티션

- 파티션은 추가만 가능한(append-only) 파일

- 각각 메시지가 저장된 위치를 오프셋(offset)이라고 한다.

- 프로듀서가 메시지를 저장을 하게되면  offset0, offset1 ... offset7 이런식으로 값을 가지게 된다.(파티션 맨 뒤에 추가된다.)

- 컨슈머는 오프셋 기준으로 순서대로 읽는다. (이후 메시지만 읽을 수 있다. 전에 메시지는 읽지 못한다.)

- 메시지는 삭제되지 않는다.

 

여러 파티션과 프로듀서

토픽은 여러 파티션으로 구성될 수 있는데 프로듀서는 어떤 파티션에 메시지를 저장을 할까? 

라운드로빈 또는 키를 이용해서 파티션을 선택한다. (같은 키에 대해서는 같은 파티션에 저장이 된다.  말인즉슨 같은 키는 순서가 유지된다.)

 

여러 파티션과 컨슈머

 - 컨슈머는 컨슈머그룹에 속하게 된다.

 - 한 개 파티션은 컨슈머그룹의 한 개 컨슈머만 연결이 가능하다.

     - 즉 컨슈머그룹에 속한 컨슈머들은 한 파티션을 공유할 수 없다.

     - 한 컨슈머그룹 기준으로 파티션의 메시지는 순서대로 처리된다.

     - 한개의 파티션을 서로 다른 그룹의 컨슈머는 공유할 수 있다.

성능이 좋은 이유

1. 파티션 파일을 OS 페이지캐시를 사용한다.

 - 파티션에 대한 파일 IO를 메모리에서 처리한다.

 - 서버에서 페이지캐시를 카프카만 사용해야 성능에 유리

2. Zero Copy

 - 디스크 버퍼에서 네트워크 버퍼로 직접 데이터 복사하기 때문에 빨라진다.

3. 브로커가 하는 일이 비교적 단순

 - 메시지 필터, 메시지 재전송과 같은 일은 브로커가 하지 않음(프로듀서, 컨슈머가 한다.)

 - 브로커는 컨슈머와 파티션 간 매핑 관리

4. 묶어서 보내고, 묶어서 받는다.

 - 프로듀서 : 일정 크기만큼 메시지를 모아서 전송 가능하다.

 - 컨슈머 : 최소 크기만큼 메시지를 모아서 조회 가능하다.

 - 낱개 처리보다 처리량 증가

5. 처리량 증대(확장)가 쉽다.

 - 1개 장비의 용량 한계 -> 브로커 추가, 파티션 추가

 - 컨슈머가 느림 -> 컨슈머 추가( +파티션 추가)

 

리플리카 - 복제

리플리카 : 파티션의 복제본

 - 복제수만큼 파티션의 복제본이 각 브로커에 생김

리더와 팔로워로 구성

 - 프로듀서와 컨슈머는 리더를 통해서만 메시지 처리

 - 팔로워는 리더로부터 복제

장애 대응

 - 리더가 속한 브로커 장애시 다른 팔로워가 리더가 됨

 

 

 

참고 

- kafka 조금 아는 척하기 1 (개발자용) (https://www.youtube.com/watch?v=0Ssx7jJJADI)

728x90
반응형

'Kafka' 카테고리의 다른 글

Kafka 기초3 - 컨슈머  (0) 2021.07.24
Kafka 기초2 - 프로듀서  (0) 2021.07.22
반응형

Counter라는 class는 처음에 생성이 되면 counter라는 변수가 0으로 초기화된다.

해당 class에는 increase라는 함수도 있다.

class Counter {

  constructor() {

   this.counter = 0;



  }

  increase() { // class에서 함수를 작성할 때는 function 생략 가능

    this.counter ++;

    if(this.counter % 5 === 0) {

      console.log("yo!");

    }

  }

}

===========================

클래스 다이어그램

const coolCounter = new Counter();

coolCounter.increase();

coolCounter.increase();

coolCounter.increase();

coolCounter.increase();

coolCounter.increase();

 

위와 같이 객체를 생성 후 increase() 함수를 5번 호출하게 되면 "yo"라는 문구가 console 창에 출력된다.

하지만 이렇게 사용하게 되면 if 내부 (console.log("yo!");) 를 컨트롤하는 것이 쉽지 않다.

대안으로는 callback 함수를 전달하는 방법이 있다.

 

개선1

Counter class

increase(runIf5Times) { 

    this.counter ++;

    if(this.counter % 5 === 0) {

      runIf5Times(this.counter); // 해당 counter를 출력하고 싶을 때 counter를 전달한다.

    }

  }

===========================

 

const coolCounter = new Counter();

function printSomething(num) {

  console.log(`yo! ${num}`);

}

coolCounter.increase(printSomething);

coolCounter.increase(printSomething);

coolCounter.increase(printSomething);

coolCounter.increase(printSomething);

coolCounter.increase(printSomething);

 

위와 같이 구성할 경우 장점은 함수를 자신의 요구 사항에 맞게 전달할 수 있다.

console 출력을 alert으로 함수 변경

 

function alertNum(num) {

  alert(`yo! ${num}`);

}



coolCounter.increase(alertNum);

coolCounter.increase(alertNum);

coolCounter.increase(alertNum);

coolCounter.increase(alertNum);

coolCounter.increase(alertNum);

 

개선2

increase 함수를 호출할 때마다 callback 함수를 전달하니 불편하다.

대안으로 constructor에서 callback 함수를 받아서 class 자체에서 기억하도록 한다.

Counter class를 만들 때 원하는 콜백 함수를 생성자에 전달해 준다. 

 

Counter class

class Counter {

  constructor(runEveryFiveTimes) {

   this.counter = 0;

   this.callback = runEveryFiveTimes;

  }

  increase() {

    this.counter ++;

    if(this.counter % 5 === 0) {

      this.callback(this.counter);

      if(this.callback) {        // 사용자가 callback 함수를 넘겨줄 때만 실행되도록 

        this.callback(this.counter);

      } => this.callback && this.callback(this.counter);     // 한줄로 개선 가능

    } 

  }

}

클래스 다이어그램

===========================

 

const consoleCounter = new Counter(printSomething); // console Counter class

const alertCounter = new Counter(alertNum); // alert Counter class

function printSomething(num) {

  console.log(`yo! ${num}`);

}

function alertNum(num) {

  alert(`yo! ${num}`);

}

consoleCounter.increase();

consoleCounter.increase();

consoleCounter.increase();

consoleCounter.increase();

consoleCounter.increase(); // console에 yo! 5 출력

 

참고

- 자바스크립트 기초4. 클래스 | 클래스 예제와 콜백 함수 최종 정리

 

728x90
반응형
반응형

스트림(Stream)

스트림(Stream)은 '데이터의 흐름’입니다. 배열 또는 컬렉션 인스턴스에 함수 여러 개를 조합해서 원하는 결과를 필터링하고 가공된 결과를 얻을 수 있습니다. 또한 람다를 이용해서 코드의 양을 줄이고 간결하게 표현할 수 있습니다. 즉, 배열과 컬렉션을 함수형으로 처리할 수 있습니다.

 

스트림은 자바8부터 추가된 컬렉션의 저장 요소를 하나씩 참조해서 람다식으로 처리할 수 있도록 해주는 반복자입니다. Iterator와 비슷한 역할을 하지만 람다식으로 요소 처리 코드를 제공하여 코드가 좀 더 간결하게 할 수 있다는 점과 내부 반복자를 사용하므로 병렬처리가 쉽다는 점에서 차이점이 있습니다. 

 

스트림에 대한 내용은 크게 세 가지로 나눌 수 있습니다.

  1. 생성하기 : 스트림 인스턴스 생성.
  2. 가공하기 : 필터링(filtering) 및 맵핑(mapping) 등 원하는 결과를 만들어가는 중간 작업(intermediate operations).
  3. 결과 만들기 : 최종적으로 결과를 만들어내는 작업(terminal operations).

1. 생성하기

배열 스트림

 

String[] arr = new String[]{"Bumblebee", "b", "c"};
Stream<String> stream = Arrays.stream(arr); // return Bumblebee, b, c
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3); // return b, c

 

컬렉션 스트림

 

List<String> list = Arrays.asList("a", "b", "c");
Stream<String> colStream = list.stream(); // return a, b, c
Stream<String> parallelStream = list.parallelStream();  // 병렬 처리 스트림

 

Stream.builder()

 

Stream<String> builderStream =
Stream.<String>builder()
                        .add("kakao").add("naver").add("google")
                        .build(); // return kakao, naver, google

 

Stream.generate()

생성되는 스트림은 크기가 정해져있지 않고 무한(infinite)하기 때문에 특정 사이즈로 최대 크기를 제한해야 합니다.

 

 

Stream<String> generatedStream =
Stream.generate(() -> "Bumblebee").limit(3); // return Bumblebee, Bumblebee, Bumblebee

 

Stream.iterate()

초기값 설정 후 해당 값을 람다를 통해서 스트림에 들어갈 요소를 만들 수 있다.

 

Stream<Integer> iteratedStream = Stream.iterate(10, n -> n + 2).limit(3); // return 10, 12, 14

 

기본 타입형 스트림

리스트나 배열을 이용해서 기본 타입(int, long, double) 스트림을 생성할 수 있습니다.

 

IntStream intStream = IntStream.range(1, 5); // return 1,2,3,4
LongStream longStream = LongStream.rangeClosed(1, 5); // return 1,2,3,4,5

 

Java 8 의 Random 클래스는 난수를 가지고 세 가지 타입의 스트림(IntStream, LongStream, DoubleStream)을 만들어낼 수 있습니다.

DoubleStream doubles = new Random().doubles(3);

 

병렬 스트림 Parallel Stream

스트림 생성 시 사용하는 stream 대신 parallelStream 메소드를 사용해서 병렬 스트림을 쉽게 생성할 수 있습니다.

내부적으로는 쓰레드를 처리하기 위해 자바 7부터 도입된 Fork/Join framework 를 사용합니다.

각 작업을 쓰레드를 이용해 병렬 처리됩니다.

 

Stream<Product> parallelStream = productList.parallelStream(); // 병렬 스트림 생성
boolean isParallel = parallelStream.isParallel(); // 병렬 여부 확인

 

다시 시퀀셜(sequential) 모드로 돌리고 싶다면 다음처럼 sequential 메소드를 사용합니다.

 

parallelStream.sequential(); // sequential
boolean isParallel = parallelStream .isParallel();

 

스트림 연결하기

 

Stream<String> stream1 = Stream.of("kakao", "naver");
Stream<String> stream2 = Stream.of("google");
Stream<String> concat = Stream.concat(stream1, stream2);
concat.forEach(a -> System.out.print(a+", ")); // kakao, naver, google

 

2. 가공하기

전체 요소 중에서 다음과 같은 API 를 이용해서 내가 원하는 것만 뽑아낼 수 있습니다.

이러한 가공 단계를 중간 작업(intermediate operations)이라고 하는데, 이러한 작업은 스트림을 리턴하기 때문에 여러 작업을 이어 붙여서(chaining) 작성할 수 있습니다.

 

Filtering

필터(filter)은 스트림 내 요소들을 하나씩 평가해서 걸러내는 작업입니다. 

 

List<String> itCompany = Arrays.asList("kakao", "naver", "google"); // 테스트 dataset

Stream<String> filterStream = itCompany.stream().filter(company -> company.contains("a")); // return kakao, naver

 

Mapping

맵(map)은 스트림 내 요소들을 하나씩 특정 값으로 변환해줍니다. 이 때 값을 변환하기 위한 람다를 인자로 받습니다.

 

Stream<String> mapStream = itCompany.stream().map(String::toUpperCase); // return KAKAO, NAVER, GOOGLE

 

Sorting

 

List<String> sortStream = itCompany.stream()
                                            .sorted()
                                            .collect(Collectors.toList()); //return google, kakao, naver,


List<String> sortReverseStream = itCompany.stream()
                                                  .sorted(Comparator.reverseOrder()) //역순
                                                  .collect(Collectors.toList()); // return naver, kakao, google

 

Comparator 의 compare 메소드는 두 인자를 비교해서 값을 리턴합니다.

 

List<String> compareSortStream = itCompany.stream()
                                                   .sorted(Comparator.comparingInt(String::length))
                                                   .collect(Collectors.toList()); // return kakao, naver, google


List<String> collect = itCompany.stream()
                                        .sorted((s1, s2) -> s2.length() - s1.length())
                                        .collect(Collectors.toList()); // reutrn google, kakao, naver,

 

peek

확인해본다는 단어 뜻처럼 특정 결과를 반환하지 않는 함수형 인터페이스 Consumer 를 인자로 받습니다.

 

IntStream.of(1, 3, 5, 7, 9)
                            .peek(System.out::println)
                            .sum(); //return 1, 3, 5, 7, 9

 

 

3. 결과 만들기

가공한 스트림을 가지고 내가 사용할 결과값으로 만들어내는 단계입니다. 따라서 스트림을 끝내는 최종 작업(terminal operations)입니다.

 

Calculating

스트림 API 는 다양한 종료 작업을 제공합니다. 최소, 최대, 합, 평균 등 기본형 타입으로 결과를 만들어낼 수 있습니다.

 

long count = IntStream.of(1, 2, 3, 4, 5).count(); //return 5
long sum = LongStream.of(1, 2, 3, 4, 5).sum();  //return 15

 

만약 스트림이 비어 있는 경우 count  sum 은 0을 출력하면 됩니다. 하지만 평균, 최소, 최대의 경우에는 표현할 수가 없기 때문에  Optional을 이용해 리턴합니다.

 

OptionalInt min = IntStream.of(1, 3, 5, 7, 9).min(); //return OptionalInt[1]
OptionalInt max = IntStream.of(1, 3, 5, 7, 9).max(); //return OptionalInt[9]

 

Reduction

스트림은 reduce라는 메소드를 이용해서 결과를 만들어냅니다.

 

reduce 메소드는 총 세 가지의 파라미터를 받을 수 있습니다.

  1. accumulator : 각 요소를 처리하는 계산 로직. 각 요소가 올 때마다 중간 결과를 생성하는 로직.
  2. identity : 계산을 위한 초기값으로 스트림이 비어서 계산할 내용이 없더라도 이 값은 리턴.
  3. combiner : 병렬(parallel) 스트림에서 나눠 계산한 결과를 하나로 합치는 동작하는 로직.

1. accumulator 

 

OptionalInt reduced =
IntStream.range(1, 4) //[1, 2, 3]
.reduce((a, b) -> {
    return Integer.sum(a, b);
});  // return OptionalInt[6]

 

2. identity

10은 초기값이고, 스트림 내 값을 더해서 결과는 16이 됩니다.

여기서 람다는 메소드 참조(method reference)를 이용해서 넘길 수 있습니다.

 

int reducedTwoParams =
IntStream.range(1, 4) // [1, 2, 3]
.reduce(10, Integer::sum); // return 16

 

3.combiner

Combiner 는 병렬 처리 시 각자 다른 쓰레드에서 실행한 결과를 마지막에 합치는 단계입니다.

따라서 병렬 스트림에서만 동작합니다.

 

Integer reducedParams = Stream.of(1, 2, 3)
.reduce(10, // identity
Integer::sum, // accumulator
(a, b) -> {
   System.out.println("combiner was called"); // 병렬 스트림이 아니어서 호출 안됨
   return a + b;
}); // return 16

 

결과는 다음과 같이 36이 나옵니다. 먼저 accumulator 는 총 세 번 동작합니다. 초기값 10에 각 스트림 값을 더한 세 개의 값(10 + 1 = 11, 10 + 2 = 12, 10 + 3 = 13)을 계산합니다. Combiner 는 identity 와 accumulator 를 가지고 여러 쓰레드에서 나눠 계산한 결과를 합치는 역할입니다. 12 + 13 = 25, 25 + 11 = 36 이렇게 두 번 호출됩니다.

 

Integer reducedParallel = Arrays.asList(1, 2, 3)
.parallelStream() // 병렬 스트림
.reduce(10,
Integer::sum,
(a, b) -> {
   System.out.println("combiner was called"); // 두번 호출 됨
   return a + b;
}); // return 36

 

Collecting

- Product.java

public class Product {
int count;
String name;

public Product(int count, String name) {
this.count = count;
this.name = name;
}

public String getName() {
return this.name;
}

public int getAmount() {
return this.count;
}
}

// 테스트 dataset

List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
                                                      new Product(14, "orange"),
                                                      new Product(13, "lemon"),
                                                      new Product(23, "bread"),
                                                      new Product(13, "sugar"));

 

Collectors.toList()

 

List<String> collectorCollection = productList.stream()
                                                      .map(Product::getName)
                                                   	.collect(Collectors.toList());

// return potatoes, orange, lemon, bread, sugar

 

Collectors.joining()

 

String listToString = productList.stream()
                                         .map(Product::getName)
                                         .collect(Collectors.joining()); // reutrn potatoesorangelemonbreadsugar

 

 

  • delimiter : 각 요소 중간에 들어가 요소를 구분시켜주는 구분자
  • prefix : 결과 맨 앞에 붙는 문자
  • suffix : 결과 맨 뒤에 붙는 문자
String listToString2 = productList.stream()
                                         .map(Product::getName)
                                         .collect(Collectors.joining(", ", "<", ">"));

// reutrn <potatoes, orange, lemon, bread, sugar>

 

Collectors.averageingInt()

 

Double averageAmount = productList.stream()
                                         .collect(Collectors.averagingInt(Product::getAmount)); // return 17.2

 

Collectors.summingInt()

 

Integer summingAmount = productList.stream()
                                          .collect(Collectors.summingInt(Product::getAmount)); // return 86

 

IntStream 으로 바꿔주는 mapToInt 메소드를 사용해서 좀 더 간단하게 표현할 수 있습니다.

 

Integer summingAmount = productList.stream()
                                           .mapToInt(Product::getAmount)
                                           .sum(); // return 86

 

Collectors.summarizingInt()

 

IntSummaryStatistics statistics =productList.stream()
                                                   .collect(Collectors.summarizingInt(Product::getAmount));

//return IntSummaryStatistics{count=5, sum=86, min=13, average=17.200000, max=23}

 

 

참고

- Java 스트림 Stream (1) 총정리(https://futurecreator.github.io/2018/08/26/java-8-streams/)

- [Java] 자바 스트림(Stream) 사용법 & 예제(https://coding-factory.tistory.com/574)

728x90
반응형

'JAVA' 카테고리의 다른 글

JAVA int, Integer 비교  (0) 2023.03.16
반응형

Intellij 2020.3 버전에서 node.js 프로젝트 생성하는 방법

1. Node.js 다운로드 (https://nodejs.org/ko/download/)

2. Intellij > File > New > project > JavaScript > Express

 

 

3. Options > View Engine : EJS

 

 

4. 프로젝트 실행(port - 3000)

 

728x90
반응형

+ Recent posts