Kafka APIs
Kafka APIs
카프카 클러스터에 데이터를 전달, 데이터를 poll, 설정/옵션을 확인 또는 변경하기 위해 kafka를 제어할 수 있는 kafka-client 라이브러리가 있습니다. 이 라이브러리를 활용해 kafka를 활용하는 어플리케이션들을 만들 수 있습니다. 크게 producer에서 사용되는 api, consumer에서 사용되는 api 마지막으로 admin용으로 사용되는 api들이 라이브러리에 메소드로 정의되어 있습니다.
Producer API
우선 코드를 작성하기에 앞서 카프카 클러스터에 토픽 설정이 되어있어야 합니다. 우선 토픽을 만들어 줍니다. Producer는 java에서 kafka-clients의 producer 클래스틀 import해서 코드를 작성합니다.
프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티션 설정, 배치 생성 단계를 거쳐 데이터를 카프카 클러스터로 보냅니다. 처음 ProducerRecord 클래스틀 통해 데이터를 생성한 후 데이터를 보낼 토픽을 설정해 줍니다. 그 다음 ProducerRecord로 만들어진 인스턴스가 send 메소드를 입력 받아 partitioner로 전달됩니다. 여기서 어떤 파티션으로 전달될지 결정이 된 후 데이터 전송 직전에 accumulator라는 버퍼에 대기하여 브로커로 보내질 준비를 합니다. 여기서 배치 형태로 데이터가 묶여 전송되는 방식으로 처리량을 늘립니다. 그 다음 sender 스레드가 이렇게 쌓인 배치 데이터를 카프카 브로커로 전송합니다. 각 단계에서 프로듀서는 config.put코드를 활용해 각 단계에서의 작업을 프로그래밍 할 수 있습니다. 파티션을 지정하는 것, 배치 주기 등등을 프로그래밍 할 수 있습니다.
Consumer API
컨슈머는 프로듀서 처럼 자바 어플리케이션을 통해 구현할 수 있습니다. 프로듀서와 똑같이 kafka-client의 consuemr 클래스를 import 하면 됩니다.
컨슈머는 레코드를 가여로 토픽, 브로커 ip, 자신의 consumer group id를 설정에서 지정한 후 보내주면 됩니다. 레코드를 받아올 때 프로듀서에서 진행한 serialzation의 자료형과 동일한 자료형의 deserialization을 해주는 것을 잊지 말아야합니다. 그 다음 무한 루프를 이용한 poll 메소드로 데이터를 처리합니다.
이러한 컨슈머 운영에서 중요한 점은 각 컨슈머들은 토픽의 1개 이상의 파티션에 할당되어 데이터를 가져갈 수 있다는 점입니다. 컨슈머 그룹에서 데이터를 가져갈 때 1개 파티션은 최대 1개의 컨슈머에 할당 가능합니다. 그리고 1개의 컨슈머는 여러 파티션에 연결될 수 있습니다. 즉 병렬 처리를 하여 데이터 처리 속도를 높이기 위해서는 파티션 개수를 늘림과 동시에 그만큼 컨슈머의 개수를 늘려서 처리량을 늘릴 수 있습니다. 다만 1개의 파티션은 여러개의 컨슈머에 연결 될 수 없습니다. 즉 파티션의 개수보다 컨슈머의 개수가 더 크면 의미가 없는 컨슈머 스레드가 됩니다. 또한 이러한 컨슈머, 파티션의 균형은 만약 한쪽의 상태가 변할 경우(컨슈머에서 장애가 나는 경우) 리밸런싱을 통해 지속적인 처리가 가능하도록 해줍니다.
또한 컨슈머 그룹을 여러개 두어 각 컨슈머 그룹마다 다른 데이터 처리를 수행할 수 있습니다. 이게 가능한 것은 컨슈머가 데이터를 가져간 이후에도 브로커가 레코드를 삭제하지 않고 또 다른 컨슈머가 다시 오프셋 값을 통해 참조할 수 있기 때문입니다.
컨슈머 그룹은 토픽에서 데이터를 가져오면서 처리가 완료된 offset을 __consuemr_offset에 커밋합니다. 이러한 커밋의 주기는 매 작동시마다 또는 일정한 시간마다로 설정이 가능합니다. 비동기 요청 또한 가능합니다.
Admin API
카프카 브로커에 직접 접속해 카프카 내부 옵션 설정, 확인을 하는 작업은 일회성에 그치기 때문에 지속적인 카프카 클러스터 상태 트래킹이 쉽지 않다. 이러한 이유로 kafka는 Admin API를 제공하여 지속적인 내부 옵션 설정, 조회를 자동화를 가능하게 해준다.
어드민 API는 KafkaAdminClient 클래스에 구현이 되어 있는데 대표적인 메서드는 브로커의 정보를 조회하는 describeCluster, 토픽 리스트를 조회하는 listTopics, 컨슈머 그룹을 조회하는 listConsumerGroups, 신규 토픽을 생성하는 createTopics, 파티션 개수를 변경하는 createPartitions, 접근 제어 규칙을 생성하는 createAcls가 있습니다.
출처
- 아파치카프카 애플리케이션 프로그래밍 with자바
- 카프카 데이터플랫폼의 최강자
Leave a comment