카프카 컨슈머 정리
카프카 컨슈머
1. 카프카 컨슈머란?
- 카프카의 토픽에 존재하는 메시지를 가져와서 소비(consume)하는 애플리케이션, 서버 등등을 지칭하는 개념.
- 파티션 리더에게 메세지를 요청하고 메세지를 가지고 오게한다. 각 요청은 offset을 기준으로 이루어지기 때문에 이미 전송 받은 메세지도 다시 전송 받을 수 있다.(offset만 제대로 설정하면)
2. 명령어를 이용한 컨슈머 제작
consumer-console.sh 를 이용해서 컨슈머를 만들 수 있다.
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap [브로커 ip주소:포트] --topic [토픽 이름] --group [컨슈머 그룹 이름] --from beginning
이렇게 하면 해당 토픽에 해당하는 메세지를 가져올 수 있다. 이때 컨슈머 그룹 이름 또한 지정해서 컨슈머를 만들었다.
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap [브로커 ip주소:포트] --list
이렇게 하면 컨슈머 리스트를 확인 할 수 있다.
3. 자바를 이용한 컨슈머
[2. 명령어를 이용한 컨슈머] 처럼 명령어를 이용해서 컨슈머를 만들 수 있지만 이렇게 자바를 이용해서 컨슈머를 만들 수 있다.
gradle build 하는 방법은 동주형이 작성 해준 문서(클릭하세요)에 잘 나와 있다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaBookConsumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092"); // 브로커 리스트를 정의합니다. 여기서 ip주소를 etc/hosts에 설정을 해놔야 하더라구요.
props.put("group.id", "peter-consumer"); // 그룹 아이디는 뒤에서 더 설명합니다. 이거 설정 해줘야해요.
props.put("enable.auto.commit", "true");
// 오프셋값을 _consumer_offsets라는 토픽에 커밋해준다. 근데 이게 수동 커밋을 안해주면 중복이 생길 수도 있다. 그거 알고는 있어야함. auto.commit.intervals.ms 로 커밋 주기 설정 가능함.
props.put("auto.offset.reset", "latest"); // offset이 존재하지 않거나(초기) 현재 오프셋이 더이상 존재하지 않는경우 (데이터가 삭제된 경우) 여기서 지정된 옵션으로 리셋됩니다. latest: 가장 마지막의 offset, earliest: 가장 처음의 offset, none: 이전 offset을 찾지 못하면 에러를 뱉음.
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//문자열 시리얼라이즈 해줬기 때문에 deserializer써주기.
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("namsick-topic"));// 여기에 구독할 토픽 이름을 적어줘야 합니다.
try {
while (true) {//이게 킬포임. 계속 무한 루프 돌아야함.
ConsumerRecords<String, String> records = consumer.poll(100); //poll(100)은 버퍼에 데이터가 없다면 얼마동안 오랫동안 블록 할지를 알려줍니다. 0으로 하면 블록 없고 바로 데이터를 가져옵니다.
for (ConsumerRecord<String, String> record : records)
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
} finally {
consumer.close();
}
}
}
4. 파티션 그리고 메세지 순서
-
파티션이 여러개인 경우 컨슈머가 메세지를 가져오는 순서는 입력 순이 아닐 수 있다.
### 4.1 파티션이 여러개인 경우 메세지 순서
프로듀서가 a,b,c,d,e,1,2,3,4,5 를 순서대로 입력했을 때 파티션이 3개이면 어떤 방식으로 저장 될까?
파티션이 여러개인 경우 파티션 내에 데이터가 어떻게 들어가는지 다음 그림을 통해 알 수 있다.
현재 해당 토픽의 파티션은 위 그림처럼 저장되어 있는 상태이다. 컨슈머는 결국 offset을 기준으로 데이터를 가져온다. 따라서 각 파티션 내의 offset을 기준으로 데이터가 읽히고 한 파티션이 읽히면 다음 파티션의 데이터를 읽는 형식으로 진행이 된다. 결국 데이터가 출력되는 순서는 ad14be25c3이다.
### 4.2 파티션이 1개인 경우 메세지 순서
이 경우는 말할 것도 없다. 파티션이 1개이기 때문에 offset기준으로 봐도 입력 순서대로 나간다. 일종의 Queue형태가 된다. FIFO 형태를 만족한다.
5. 컨슈머 그룹
컨슈머 그룹은 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해서 메세지를 가져올 수 있게 해준다. 이게 생긴 이유는 하나의 컨슈머가 메세지를 가져가서 offset을 늘려버리면 지나간 메세지가 삭제되기 때문이다(설정에 따라 다르지만…) 컨슈머 그룹으로 만들어 놓으면 하나의 데이터를 다양한 용도로 사용할 수 있게 된다. 또한 컨슈머의 처리 속도를 늘릴 수 있다.
그림을 통해 알아 보자. 기존에 컨슈머 그룹에 컨슈머가 1개만 있는 상태이다.
이때 프로듀서에서 메세지 생성이 더 빨라져서 메세지가 점점 쌓이게 된다.
그럼 컨슈머 그룹에 컨슈머를 추가해 주자
동일한 그룹 내에서 컨슈머가 추가되면 파티션의 소유권이 재조정 된다. 이걸 리밸런스라고 한다. 이런 리밸런스를 통해 가용성, 확장성 확보가 가능하다. 근데 리밸런스 하는 도중에는 컨슈머가 메세지를 가져올 수 없다. (그래서 auto commit으로 인한 중복 문제가 발생한다.)
단순히 컨슈머 그룹에 컨슈머를 추가해서 해결 되는게 아닌 경우도 있다. 컨슈머는 파티션과 1:1 대응이 원칙이다. 그렇기 때문에 파티션을 늘리는 경우가 필요하기도 하다.
이런 경우에는 파티션을 늘려서 처리 속도를 높여주면 된다.
만약 컨슈머가 갑자기 다운 되었을 때 어떻게 될까?
바로 그림처럼 다른 컨슈머 하나가 2개의 파티션을 담당하게 된다. 다만 이는 불완전한 상황이기 때문에 얼른 컨슈머 하나를 더 추가해주는 리밸런스 작업을 해줘야한다.
하나의 토픽에 2가지 컨슈머 그룹이 메세지를 가져가는 것이 가능하다. 이것이 다른 메세지 큐 솔류션과 카프카의 차이점이자 카프카의 장점이다. 메세지를 가져가도 삭제되지 않고 offset을 기준으로 메세지가 컨슈머에게 가기 때문이다.
6. 오프셋 커밋
컨슈머는 poll()을 통해 카프카 파티션에서 아직 가져오지 않은 메세지를 가져온다. 이때 메세지의 위치 기록은 offset에 의해 기록된다. 그리고 이러한 offset을 업데이트하는 동작을 commit이라고 한다.
카프카는 카프카 내의 토픽 __consumer_offsets에 각 파티션 별 offset을 저장한다.
만약 컨슈머가 리밸런스가 일어나면 각각의 컨슈머는 새로운 파티션에 배정받게 된다. 이때 가장 최근 커밋된 offset을 기준으로 메세지를 읽기 시작한다. 이래서 중복 or 누락이 발생하는 것이다.
6.1 자동 커밋
그냥 enable.auto.commit=true로 설정하면 기본값인 5초 마다 가장 마지막으로 poll한 오프셋을 커밋합니다. (5초는 auto.commit.interval.ms옵션으로 조정 가능)
그런데 주의해야할 점이 있습니다. 만약 오프셋 커밋하는 주기 중간에 (5초가 주기라면 3초 정도에) 리밸런스가 일어나는 경우 중복이 일어날 수 있습니다. 주기를 줄여서 중복의 크기를 줄일 수는 있지만 중복을 완전히 없앨수는 없습니다.
6.2 수동 커밋
메세지 손실을 방지하기 위해서 수동으로 커밋을 진행하기 합니다. 이러한 방법은 데이터베이스(또는 어플리케이션에)에 완전히 저장 된 후에 커밋을 진행해야 하는 경우입니다.
public class KafkaBookConsumerMO {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092");
props.put("group.id", "peter-manual");
props.put("enable.auto.commit", "false");// 여기서 auto.commit을 false로 해줘야 합니다.
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("peter-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}// consumer가 토픽에서 메세지를 다 가져온 후에
//아래 처럼 이렇게 명시적으로 commitsync() 함수를 써줍니다.
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.printf("commit failed", e);
}
}
}
}
이렇게 수동 커밋을 할 수 있습니다. 하지만 이렇게 수동 커밋을 해도 중복이 발생할 수 있습니다. 메세지를 저장하는 순간 (몇 밀리세컨드) 리밸런스가 일어나거나 시스템 fail이 일어나는 경우 마지막으로 저장된 offset부터 시작하기 때문에 저장하던 메세지가 중복되어서 전달 될 수 있습니다. 그렇지만 매번 메세지를 컨슈머에 전달할 때 커밋이 일어나기 때문에 손실은 존재하지 않습니다.
6.3 특정 파티션 할당
카프카가 알아서 파티션을 컨슈머에 할당하는게 아니라 특정 파티션을 특정 컨슈머에 할당하려면 다음과 같이 하면 됩니다.
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class KafkaBookConsumerPart {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092");
props.put("group.id", "peter-partition");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "peter-topic"; //토픽 이름을 String으로 설정
TopicPartition partition0 = new TopicPartition(topic, 0);// 토픽 이름과 파티션을 객체로 지정
TopicPartition partition1 = new TopicPartition(topic, 1);//토픽 이름과 파티션을 객체로 지정
consumer.assign(Arrays.asList(partition0, partition1));// Array로 consumer에 assign한다.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.printf("commit failed", e);
}
}
}
}
이렇게 하면 수동으로 파티션을 할당해 메시지를 가져올 수 있습니다. 이 때 컨슈머 인스턴스들은 모두 서로 다른 컨슈머 그룹에 속해 있어야 합니다. 컨슈머 그룹은 동일한 파티션 offset을 공유하기 때문에 만약 컨슈머 그룹 내의 한 컨슈머가 종료가 될 경우 다른 컨슈머가 메세지를 이어 받아 메세지를 가져오게 됩니다. 그 결과 오프셋을 이어 받고 결국 원치 않은 형태로 동작할 수 있습니다.
6.4 특정 오프셋 지정해서 메세지 가져오기
consumer.assign(Arrays.asList(partition0, partition1));// Array로 consumer에 assign한다.
//추가 코드
consumer.seek(partition0,2);
consumer.seek(partition1,2);
이 코드가 추가 될 경우 수동으로 오프셋을 지정해서 어디서 부터 메세지를 읽을 지 지정해 줄 수 있다. 여기서는 파티션에게 offset 2부터 메세지를 읽으라고 지시한 것이다. 여기서 seek에 지정한 offset부터 poll()을 시작한다.
출처: 카프카, 데이터 플랫폼의 최강자, apache kafka
Leave a comment