실전 카프카 개발부터 운영까지 - 카프카 기본 개념과 구조

들어가며


  • 해당 포스팅은 실전 카프카 개발부터 운영까지 도서를 학습하며 정리한 글입니다.
실전 카프카 개발부터 운영까지
실전 카프카 개발부터 운영까지
실전 카프카 개발부터 운영까지 작품소개: [도서 개요]아파치 카프카의 공동 창시자 준 라오(Jun Rao)가 추천한 책!국내 최초이자 유일한 컨플루언트 공인 아파치 카프카 강사(Confluent Certified Trainer for Apache Kafka)와 공인 관리자 자격(Confluent Certified Administrator for Apache Kafka)을 보유한 『카프카, 데이터 ...

0. 카프카의 핵심 개념


분산 시스템


  • 하나의 브로커에 장애가 발생 할 때 다른 서버 또는 노드가 대신 처리하므로 장애 대응이 탁월합니다.
  • 부하가 높은 경우에는 시스템 확장이 용이합니다. (브로커 확장, 추가)

페이지 캐시


  • 운영체제의 성능을 높이기 위해 페이지 캐시를 활용합니다.
  • 직접 디스크에 I/O를 하는 것 대신 물리 메모리 중 애플리케이션이 사용하지 않는 일부 잔여 메모리를 활용합니다.
  • 카프카가 직접 디스크에 I/O를 하지 않고 페이지 캐시를 통해 읽고 쓰기를 합니다.

배치 전송 처리


  • 통신을 묶어서 처리해서 오버헤드를 줄이고 빠르고 효율적으로 처리 가능합니다.

압축 전송


  • gzip, snappy, lz4, zstd등 압축으로 네트워크 대역폭이나 회선 비용을 줄입니다.
  • 배치 전송과 결합해서 사용하면 높은 효과를 얻게 됩니다.

1. 카프카의 기본 구성


  • 카프카는 데이터를 받아서 전달하는 데이터 버스의 역할을 합니다.
  • 프로듀서: 카프카에 데이터(메세지)를 만들어서 주는 쪽 입니다.
  • 컨슈머: 카프카에서 데이터(메세지)를 빼내서 소비하는 쪽 입니다.
  • 주키퍼: 카프카의 정상 동작을 보장하기 위해 메타데이터를 관리하는 코디네이터 입니다.

image.png

그외의 구성 요소


  • 브로커: 카프카 애플리케이션이 설치된 서버 또는 노드를 말합니다.
  • 토픽: 카프카는 메세지 피드들을 토픽으로 구분하고, 각 토픽의 이름은 카프카 내에서 고유합니다.
  • 파티션: 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러개로 나눈 것을 말합니다.
  • 세그먼트: 프로듀서가 전송한 실제 메세지가 브로커의 로컬 디스크에 저장되는 파일을 말합니다.
  • 메세지 / 레코드: 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각입니다.

2. 리플리케이션


  • 리플리케이션이란 각 메세지들을 여러개로 복제해서 카프카 클러스터 내 브로커들에 분산시키는 동작을 의미합니다.
  • 하나의 브로커가 종료 되더라도 카프카는 안정성을 유지할 수 있습니다.
  • 토픽은 원본을 포함해 총 N개가 존재합니다.
    • 토픽이 리플리케이션 되는 것이 아니라 토픽의 파티션이 리플리케이션 되는 것입니다.
      • 원본은 리더 / 복사본은 팔로워 라고 부릅니다.
      • 리플리케이션 팩터 수 2 → 리더 1 / 팔로워 1
      • 리플리케이션 팩터 수 3 → 리더 1 / 팔로워 2
  • 리플리케이션 팩터 수가 커지면 안정성은 높아지지만 그만큼 브로커 리소스를 많이 사용합니다.
    • 따라서 브로커를 효율적으로 사용하기 위해 최적의 팩터 수를 사용해야 합니다.
  • 토픽 생성시 아래와 같은 기준을 세워두고 리플리케이션 팩터 수를 설정해 사용하기를 권장합니다.
    • 테스트나 개발환경: 리플리케이션 팩터 수를 1로 설정
    • 운영환경(로그성 메세지로 유실 허용): 리플리케이션 팩터 수를 2로 설정
    • 운영 환경(유실 허용하지 않음): 리플리케이션 팩터 수를 3으로 설정

3. 파티션


  • 토픽이 한번에 처리하는 한계를 높이기 위해 토픽 하나를 여러개로 나눠 병렬 처리가 가능하게 만든 것을 파티션이라고 합니다.
  • 이렇게 하나를 여러개로 나누면 분산처리가 가능합니다.
  • 나뉜 파티션 수 만큼 컨슈머를 연결할 수 있습니다.
  • 파티션 수에 대한 결정은 매우 모호하다.
    • 메세지 크기나 초당 메세지 건수 등에 따라 달라집니다.
    • 생성 후 언제든지 늘릴 수 있지만 감소할 수는 없습니다.
    • 초기에 파티션 수를 작게 (2~4)로 생성한 후 메세지 처리량이나 컨슈머의 LAG(프로듀서가 보낸 메세지수(카프카에 남아 있는 메세지 수) - 컨슈머가 가져간 메세지 수)를 모니터링 하면서 늘려가기를 추천합니다.

4. 세그먼트


  • 프로듀서가 전송하고 컨슈머가 저장한 메세지는 토픽의 파티션에 저장됩니다.
  • 각 메세지는 세그먼트라는 로그 파일의 형태로 브로커의 로컬 디스크에 저장됩니다.

image.png

예시

# 카프카 브로커 인스턴스
$ cd /data/kafka-logs/
$ ls
  • 토픽-파티션 이름으로 되어있는 디렉토리에 저장됩니다.
  • 프로듀서는 카프카의 A 토픽으로 메세지를 전송합니다.
  • A 토픽이 파티션이 하나라고 할때 프로듀서로 부터 받은 메세지를 파티션 0의 세그먼트 로그 파일에 저장합니다.
  • 브로커의 세그먼트 로그 파일에 저장된 메세지를 컨슈머가 읽어갑니다.

5. 프로듀서 디자인


image.png

  • ProducerRecord, 카프카로 전송하기 위한 실제 데이터이며 토픽 / 파티션 / 키 / 밸류로 구성됩니다.
  • 프로듀서가 카프카로 레코드를 전송할 때, 카프카의 특정 토픽으로 메세지를 전송합니다.
    • 토픽 / 밸류는 필수값이며 파티션 / 키는 옵셔널합니다.
  • 프로듀서의 send() 메서드를 통해 시리얼라이저, 파티셔너를 거치게 됩니다.
  • 파티션을 지정하였다면 파티셔너는 동작하지 않고 지정된 파티션으로 레코드를 전달하지만, 지정하지 않은 경우에는 키를 가지고 파티션을 선택해 레코드를 전달합니다. (라운드 로빈)
  • 프로듀서 내부에서는 send() 이후에 레코드들을 파티션별로 모아두는데, 배치 전송을 하기 위함입니다.
  • 전송이 실패하면 재시도 동작이 이뤄지고 최종 실패와 성공을 리턴합니다.

6. 컨슈머의 기본 동작


  • 컨슈머는 토픽에 저장되어 있는 메세지를 가져오는 역할을 담당합니다.
  • 단순하게 카프카로부터 메세지만 가져오는 것 같지만, 내부적으로는 컨슈머 그룹, 리밸런싱 등 여러 동작을 수행합니다.
  • 프로듀서가 아무리 빠르게 카프카로 메세지를 전송하더라도 카프카로부터 빠르게 메세지를 읽어오지 못한다면 결국 지연이 발생합니다.

동작


  • 프로듀서가 카프카의 토픽으로 메세지를 전송한다면 해당 메시지들은 브로커들의 로컬 디스크에 저장됩니다.
  • 컨슈머를 이용해 토픽에 저장된 메세지를 가져올 수 있습니다.
  • 컨슈머 그룹은 하나 이상의 컨슈머들이 모여있는 그룹을 의미, 컨슈머는 반드시 컨슈머 그룹에 속하게 됩니다.
  • 컨슈머 그룹은 각 파티션 리더에게 카프카 토픽이 저장된 메세지를 가져오기 위한 요청을 보냅니다.
  • 이때 파티션 수와 컨슈머 수는 일대일로 매핑되는게 이상적입니다.

컨슈머 그룹의 이해


  • 컨슈머는 컨슈머 그룹 안에 속한 것이 일반적인 구조입니다.
  • 하나의 컨슈머 그룹 안에 여러 개의 컨슈머가 구성될 수 있습니다.
  • 컨슈머들은 토픽의 파티션과 일대일로 매핑되어 메세지를 가져옵니다.
  • 컨슈머 그룹 안의 컨슈머들은 서로 정보를 공유합니다.
    • 만약에 그룹 안의 하나의 컨슈머가 실패하면 대신 다른 컨슈머가 하던 일을 대신해 해당 컨슈머가 담당하던 토픽의 파티션을 대신 소비해줍니다.

image.png

7. 예시 (java)


7.1 프로듀서 예시


동기 전송


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class ProducerSync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의.
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); //메시지 키와 벨류에 문자열을 사용하므로 내장된 StringSerializer를 지정.
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props); //Properties 오브젝트를 전달해 새 프로듀서를 생성.

        try {
            for (int i = 0; i < 3; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distributed streaming platform - " + i); //ProducerRecord 오브젝트를 생성.
                RecordMetadata metadata = producer.send(record).get(); //get() 메소드를 이용해 카프카의 응답을 기다립니다. 메시지가 성공적으로 전송되지 않으면 예외가 발생하고, 에러가 없다면 RecordMetadata를 얻음.
                System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition()
                        , metadata.offset(), record.key(), record.value());
            }
        } catch (Exception e){
            e.printStackTrace(); //카프카로 메시지를 보내기 전과 보내는 동안 에러가 발생하면 예외가 발생함.
        } finally {
            producer.close(); // 프로듀서 종료
        }
    }
}
  • ProducerRecord 전송이 성공하고 나면 Record 메타데이터를 읽어서 파티션과 오프셋 정보를 확인하고 성공 여부를 알 수 있습니다.

비동기 전송


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class PeterProducerCallback implements Callback { //콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback를 구현하는 클래스가 필요함.
    private ProducerRecord<String, String> record;

    public PeterProducerCallback(ProducerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) {
            e.printStackTrace(); //카프카가 오류를 리턴하면 onCompletion()은 예외를 갖게 되며, 실제 운영환경에서는 추가적인 예외처리가 필요함.
        } else {
            System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition()
                    , metadata.offset(), record.key(), record.value());
        }
    }
}
  • 우선 위와 같이 콜백을 작성합니다.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerAsync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작합니다.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의.
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); //메시지 키와 벨류에 문자열을 지정하므로 내장된 StringSerializer를 지정함.
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props); //Properties 오브젝트를 전달해 새 프로듀서를 생성.

        try {
            for (int i = 0; i < 3; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distributed streaming platform - " + i); //ProducerRecord 오브젝트를 생성.
                producer.send(record, new PeterProducerCallback(record)); //프로듀서에서 레코드를 보낼 때 콜백 오브젝트를 같이 보냄.
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            producer.close(); // 프로듀서 종료
        }
    }
}
  • 프로듀서는 send() 메서도으와 콜백을 함께 호출합니다.
  • 동기 전송과 같이 프로듀서가 보낸 모든 메세지에 대해 응답을 기다리면 많은 시간을 소비하게 되므로 빠른 전송을 할 수 없습니다.
  • 비동기 방식으로 전송하면 빠른 전송이 가능하고 메세지 전송이 실패하더라도 예외처리를 할 수 있습니다.

7.2 컨슈머 예시


오토 커밋


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerAuto {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의.
        props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의.
        props.put("enable.auto.commit", "true"); //오토 커밋을 사용.
        props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화 합니다. 가장 최근부터 메시지를 가져옴.
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정.
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성.
        consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정.

        try {
            while (true) { //무한 루프 시작. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 함.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정.해당 시간만큼 블럭.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료.
        }
    }
}
  • 기본값으로 가장 많이 사용되고 있는 오토 커밋입니다.
  • 오토 커밋은 오프셋을 주기적으로 커밋하므로, 관리자가 오프셋을 따로 관리하지 않아도 된다는 장점이 있는 반면, 컨슈머 종료 등이 빈번히 일어나면 메세지를 가져오지 못하거나 중복으로 가져오기도 합니다.

동기 가져오기


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의.
        props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의.
        props.put("enable.auto.commit", "false"); //오토 커밋을 사용하지 않음.
        props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화 합니다. 가장 최근부터 메시지를 가져옴.
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정.
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성.
        consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정.

        try {
            while (true) { //무한 루프 시작. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 함.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정.해당 시간만큼 블럭함.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리함.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitSync(); //현재 배치를 통해 읽은 모든 메시지들을 처리한 후, 추가 메시지를 폴링하기 전 현재의 오프셋을 동기 커밋.
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료.
        }
    }
}
  • 오토 커밋과 다르게 poll()을 이용해서 메세지를 가져온 후 처리까지 완료하고 현재의 오프셋을 기록합니다.
  • 속도는 느리지만 실제로 토픽에는 메세지가 존재하지만 잘못된 오프셋 커밋으로 인한 위치 변경으로 컨슈머가 메세지를 가져오지 못하는 메세지 손실은 거의 발생하지 않습니다.

비동기 가져오기


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerAsync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의.
        props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의.
        props.put("enable.auto.commit", "false"); //오토 커밋을 사용하지 않음.
        props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화. 가장 최근부터 메시지를 가져옴.
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정.
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성.
        consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정.

        try {
            while (true) { //무한 루프 시작. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 함.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정.해당 시간만큼 블럭함.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync(); //현재 배치를 통해 읽은 모든 메시지들을 처리한 후, 추가 메시지를 폴링하기 전 현재의 오프셋을 비동기 커밋합니다.
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료.
        }
    }
}
  • commitAsnyc()는 동기처리인 commitSync()와 달리 오프셋 커밋을 실패하더라도 재시도 하지 않습니다.
  • 비동기 방식은 재시도를 하지 않습니다.
    • 아래와 같은 상황을 가정합니다.
      • 1번 오프셋의 메세지를 읽은 뒤 1번 오프셋을 비동기 커밋 (현재 마지막 오프셋 1)
      • 2번 오프셋의 메세지를 읽은 뒤 2번 오프셋을 비동기 커밋하지만 실패(현재 마지막 오프셋 1)
      • 3번 오프셋의 메세지를 읽은 뒤 3번 오프셋을 비동기 커밋하지만 실패(현재 마지막 오프셋 1)
      • 4번 오프셋의 메세지를 읽은 뒤 4번 오프셋을 비동기 커밋(현재 마지막 오프셋 4)
    • 이때 비동기 커밋의 재시도로 인해 2번 오프셋의 비동기 커밋이 성공하면 마지막 오프셋은 2로 변경됩니다.
    • 그리고 또 다른 컨슈머가 이어서 작업을 진행하게 되면 오프셋을 보고 3번부터 다시 시작하게 됩니다. 이렇게 되면 계속 연쇄적으로 메세지의 중복이 일어날 것입니다.

마치며

  • 카프카의 기본 개념과 구조에 대해 알아보았습니다.
  • 다음 글에서는 각 요소들 별로 내부 구현과 원리에 대해서 깊게 알아보려고 합니다.