지난 포스팅을 통해 메시지 큐의 기본 개념, 동작 원리에 대해 학습하였다. 메시지 큐는 Producer와 Consumer 간의 중간 매개체로서 데이터 교환을 도와주는 역할을 한다. 오늘은 메시지 큐와 같은 역할을 하는 오픈소스 중 하나인 Kafka에 대해 알아보고자 한다.
유튜브 데브원영 DVWY님의 아파치 카프카 강의를 보고 정리하였습니다.
카프카란?
Kafka는 분산 데이터 스트리밍 플랫폼으로, 실시간으로 쌓이는 대량의 데이터 스트림을 안정적으로 처리하고 저장할 수 있도록 설계된 오픈소스 솔루션이다. 주로 대규모의 실시간 데이터 피드를 수집하고 다양한 소스와 대상 시스템 간에 데이터를 안정적으로 전달하는데 사용된다.
Kafka는 링크드인에서 개발되어 2011년 오픈소스로 공개되었다. 이전에는 RabbitMQ, Redis 등으로 데이터 파이프라인을 구축하여 사용했지만, 이러한 애플리케이션들의 특징이 뚜렷하다 보니 데이터 처리에 있어 파편화도 심각했다. 파편화는 결국 유지보수의 어려움으로 직결되었고, 유지보수의 어려움은 비즈니스를 성장시키는데 막대한 영향을 미치는 큰 걸림돌이 되었다.
- High throughput message capacity : 짧은 시간 내에 엄청난 양의 데이터를 컨슈머까지 전달할 수 있음. 파티션을 통한 분산처리가 가능하기 때문에 데이터의 양이 많아질수록 컨슈머 개수를 늘려서 병렬처리가 가능하고, 이를 통해 데이터 처리를 더욱 빠르게 할 수 있다.
- Scalability와 Fault tolerant : 카프카는 확장성이 뛰어나다. 이미 사용되고 있는 카프카 브로커가 있다고 하더라도, 신규 브로커 서버를 추가해서 수평확장이 가능하다. 또한 이렇게 늘어난 브로커 중 몇 대가 죽더라도 이미 replica로 복제된 데이터는 안전하게 보관되어 있으므로 복구하여 처리할 수 있음
- Undeleted log : 다른 플랫폼과 달리 카프카 토픽에 들어간 데이터는 컨슈머가 데이터를 가지고 가더라도 데이터가 사라지지 않는다. 사라지지 않기에 컨슈머의 그룹만 다르다면 동일한 데이터도 각각 다른 형태로 처리할 수 있음.
카프카가 생기기 이전에는
위의 그림은 Kafka를 도입하기 이전 LinkedIn의 데이터 flow를 보여주는 그림이다. 시스템 간의 연결이 End-to-End 연결 방식으로, 시스템 간의 데이터 교환은 직접적인 통신을 통해 이루어졌다. 일반적으로 데이터를 보내는 쪽은 Source Application, 데이터를 처리하는 쪽은 Target Application이라고 하는데, 이들 간의 관계가 복잡해지고, 개수가 늘어남에 따라 데이터 전송라인도 많아지게 되었다. 따라서 배포와 장애에 쉽게 대응하지 못했다.
- End-to-End 연결방식의 아키텍처
- 데이터 연동의 복잡성 증가(하드웨어, 운영체제, 장애 등)
- 각기 다른 데이터 파이프라인 연결구조(데이터 포맷, 처리방식이 파이프라인마다 다름)
- 유지보수가 어렵고, 확장에 엄청난 노력 필요
위의 구조로는 대용량의 데이터를 실시간으로 처리하지 못할뿐더러, 데이터량이 증가했을 때 유연성과 안정성을 보장할 수 없었다. 따라서 각기 다른 시스템에서 발생하는 데이터를 한 곳에서 관리하고, 빠른 데이터 처리량에 이점이 있는 카프카를 개발하게 되었다.
카프카가 도입된 현재
카프카를 도입함으로써 여러 개였던 파이프라인이 하나의 파이프라인으로 모두 처리가 가능해졌다. 이로써 매우 적은 오버헤드로 많은 양의 데이터를 유지할 수 있게 되었고, 데이터 처리 속도가 향상되었다. 종합적으로 보면 카프카는 높은 신뢰성, 확장성, 처리량을 제공하여 대규모 데이터 스트리밍 환경에서 안정적이고 효과적이다.
▷ Producer와 Consumer 분리
카프카는 Producer와 Consumer 간의 역할이 명확히 분리되어 있다. 분리된 구조는 시스템의 유연성을 높이고, 각각의 역할에 특화된 최적화가 가능하다.
▷ 메시지 데이터를 여러 Consumer에게 허용
카프카는 데이터를 여러 Consumer에게 동시에 전송할 수 있다. 이것은 Publish/Subscribe 패턴을 따르기 때문인데, 한 번의 메시지 생성으로 여러 Consumer가 동일한 메시지를 동시에 처리할 수 있다. 이로써 데이터의 병렬처리와 다양한 시스템 간의 연결이 가능하다.
▷ 높은 처리량을 위한 메시지 최적화
카프카는 대량의 데이터를 효율적으로 처리하기 위한 다양한 최적화 기능을 제공한다.
- Batch 처리 : 특정 시간 간격이나 메시지 개수에 도달하면 메시지를 일괄적으로 묶어 처리. 이를 통해 메시지 수를 줄이고, 네트워크 대역폭을 효과적으로 활용
- 메시지 압축 : 압축 알고리즘을 사용하여 데이터 크기를 최소화
- 인덱스 및 오프셋 활용 : 데이터 순차적 읽기 및 쓰기 가능
- 자체 스토리지 포맷 사용
▷ 스케일 아웃 가능
카프카는 분산 아키텍처를 기반으로 하기 때문에, 시스템의 요구에 따라 수평적 확장이 가능하다. 새로운 브로커를 추가하거나 노드를 확장하여 시스템의 성능을 증가시킬 수 있다.
▷ 관련 생태계 제공
카프카를 기반으로 하는 다양한 도구, 라이브러리, 모듈을 개발하고 있어, 데이터 파이프라인, 이벤트 스트리밍, 모니터링, 관리 등 다양한 영역에서의 솔루션을 지원하고 있다.
카프카의 구성요소
1. 주키퍼와 브로커
주키퍼
카프카의 분산 시스템에서 사용되는 조정 및 관리도구. 메타데이터(브로커 id, 컨트롤러 id 등)를 가지고 있으며, 브로커들 간의 조율, 구성 관리를 담당한다.
카프카 클러스터
보통 3대 이상의 브로커로 클러스터를 구성하여 데이터의 안정성과 가용성을 제공한다. n개의 브로커 중 1대는 컨트롤러 기능을 수행한다.
컨트롤러 : 각 브로커에게 담당 파티션 할당 수행. 브로커 정상 동작 모니터링 관리. 특정 브로커가 중단되었을 경우 리더 파티션을 탈락시키고, 다른 팔로워 파티션들 중 하나를 리더로 다시 선택(컨트롤러에 대한 정보는 주키퍼가 가지고 있음 -> 컨트롤러에서 문제 발생 시 주키퍼가 관여)
브로커
카프카 시스템의 중심역할을 하는 서버로, 데이터의 저장, 분배, 관리를 담당한다. 주로 분산환경에서 클러스터를 형성하여 동작하며 여러 브로커가 협력하여 안정성과 가용성을 제공한다.
2. 토픽과 파티션
토픽
카프카에서 토픽은 데이터 스트림을 구분하는 주체로, 다양한 데이터들이 들어갈 수 있는데, 데이터들 중 특정 주제나 카테고리에 속하는 데이터를 그룹단위로 관리한다. 예를 들어 로그 데이터, 주문 정보, 센서 데이터 등 다양한 주제의 데이터를 다룰 수 있다. 하나의 토픽은 n개의 파티션에 할당 가능하다.
- 토픽은 이름을 가질 수 있어 명확히 명시하면 추후 유지보수 시에 편리하게 관리할 수 있다.
- 토픽에 프로듀서가 데이터를 넣고, 컨슈머가 데이터를 소비한다.
- 컨슈머가 데이터를 가져가더라도 데이터는 삭제되지 않고, 파티션에 그대로 남아있다.
- 새로운 컨슈머가 생성되었을 때 다시 offset 0번부터 가져가서 사용할 수 있다.(단 컨슈머 그룹이 달라야 하고, auto.offset.reset=earliest여야 함)
- 위의 그림처럼 동일 데이터에 대해 두 번 처리할 수 있다.
파티션
각 토픽은 여러 개의 파티션으로 나누어지며, 파티션은 특정 토픽의 데이터를 분산하여 저장하는 논리적 단위이다. 각 파티션마다 고유한 오프셋(offset)을 가지고 있다. 오프셋은 메시지의 위치를 나타내는 식별자이다. 각 메시지에는 해당 파티션에서의 순서와 함께 고유한 오프셋이 부여되어 있어, 이를 통해 Consumer는 자신이 어디까지 메시지를 소비했는지 추적하고, 중복 소비를 방지할 수 있다.
- 독립적인 순서 관리 : 각 파티션은 메시지의 처리 순서를 독립적으로 관리한다. 특정 파티션에서의 메시지 순서는 유지되지만, 다른 파티션의 메시지 순서와는 관계없다.
- 병렬처리 가능 : 데이터는 여러 개의 파티션에 분산되어 저장된다(특정한 key값 지정이 없을 경우). 여러 Consumer가 데이터를 동시에 처리할 수 있기 때문에 병렬처리에 있어 효과적이다.
- 컨슈머의 위치추적 : 컨슈머는 자신이 소비한 메시지의 오프셋을 추적함으로써 어디까지 처리하였는지 알 수 있다. 이를 통해 컨슈머는 중복소비를 방지하고, 다음에 읽어야 할 메시지를 알 수 있을뿐더러, 컨슈머에게 장애가 발생하더라도 언제든 복구되어 정상처리할 수 있다.
파티셔너
Producer에서 데이터를 보내면 Producer 애플리케이션 내의 파티셔너를 거쳐 브로커로 데이터가 전송된다. 여기서 파티셔너는 데이터를 어느 토픽의 어느 파티션에 넣을지 결정하는 역할을 하는데, 레코드에 포함된 메시지 키 또는 메시지 값에 따라 파티션의 위치가 달라진다.
파티셔너를 따로 설정하지 않을경우 기본 파티셔너인 UniformStickyPartitioner로 설정이 되며, key가 없을 때는 라운드 로빈 방식으로 레코드를 파티션에 할당한다. 또한 배치로 모을 수 있는 최대한의 레코드를 모아서 파티션으로 데이터를 보내게 된다.
필요하다면 Partitioner Interface를 제공하기 때문에 Custom Partitioner를 만들 수 있음
파티션이 2개 이상일 때 로그 기록 순서
Topic에 데이터를 넣을 때 Key값을 통해 특정 파티션에 넣도록 지정할 수 있지만, 별도로 지정하지 않는다면 Round-Robin 방식으로 데이터를 넣게 된다.
1) 키가 null이고 기본 파티셔너를 사용할 경우, 라운드 로빈으로 할당
- 라운드 로빈이라 함은 0 → 1 → 0 → 1 → 0으로 차례대로 들어간다.
- 위의 상황에서는 키를 따로 설정하지 않았으므로 partition#1에 들어가게 된다.
2) 키가 있고, 기본 파티셔너를 사용할 경우
- 키의 해시값을 구하고, 특정파티션에 할당
- 같은 키를 갖는 데이터는 같은 파티션에 저장
파티션의 개수를 늘리는 경우
파티션의 개수를 늘릴 수 있다. 하지만 늘릴 경우 파티션의 개수를 다시 줄일 수 없기에 매우 유의해야 한다. 늘려야 하는 경우는 다음과 같다.
- 프로듀서의 부하가 높아져서 높은 처리량이 필요한 경우
- 컨슈머 그룹이 늘어나거나 , 각 컨슈머가 높은 처리량을 필요로 하는 경우
- 새로운 데이터 카테고리나 주제가 추가되면서 다양한 토픽을 다뤄야 하는 경우
- 브로커 간의 리플리케이션을 설정하여 고 가용성을 확보해야 하는 경우
3. 카프카 log와 segment
log는 파티션 내의 한 칸을 의미한다. log파일은 세그먼트로 구성되어 있는데, 세그먼트는 log 파일을 일정크기로 나눈 부분을 의미한다. 즉, 실제로 메시지가 저장되는 파일 시스템 단위이다. log에 메시지가 저장될 때는 세그먼트 파일이 열려있는 상태이고, 일정 시간 또는 크기 기준으로 닫히게 된다. 세그먼트가 닫힌 이후 일정 시간(또는 용량)에 따라 파일이 삭제 또는 압축된다.
4. Producer [참고]
데이터를 카프카로 전송하기
레코드를 카프카에 전송하기 위한 일련의 과정이다.
- Record 생성(토픽, 파티션 위치, timestamp, key, value)
- 직렬화(Serializer) : 카프카 내부에 byte형태로 저장할 수 있도록 직렬화
- 파티셔닝(Partitioning) : 메시지를 특정 토픽의 특정 파티션으로 라우팅. 키 값이 설정되지 않았을 경우 round-robin 방식으로 파티셔닝
- 배치(Batching) : 설정에 따라서 배치작업 진행
- 압축(Compression) : 전송 효율 향상을 위해 메시지 압축
- 메타데이터 획득 : 카프카 클러스터로부터 토픽의 메타데이터 획득
- Record 전송 : 카프카 브로커에 전송
위 과정을 거쳐 카프카 브로커에게 메시지가 전달되며 kafka broker로 데이터를 전송할 때 전송 성공여부를 알 수 있고, 실패할 경우 재시도할 수 있다.
5. Consumer [참고]
Producer에서 카프카에 대한 데이터 쓰기를 최적화하는 것처럼, Consumer 역시
- 토픽의 파티션으로부터 데이터 polling(특정 데이터베이스에 저장 or 또 다른 파이프라인에 전달할 수 있음)
- partition의 오프셋 관리
- Consumer group을 통한 병렬처리
다중 파티션 다중 컨슈머
파티션 3개인 토픽과 컨슈머 1개
- 1개의 컨슈머는 3개의 파티션으로부터 polling
파티션 3개인 토픽과 컨슈머 3개
- 3개의 컨슈머로 이루어진 1개의 컨슈머 그룹이 토픽으로부터 polling
파티션이 3개인 토픽과 컨슈머 4개
- 컨슈머의 개수가 파티션보다 많으므로 남은 컨슈머는 파티션을 할당받지 못하고 대기 중
2개 이상의 컨슈머 그룹
- 그룹이 다르다면 동일한 파티션을 그룹 간에 병렬로 처리할 수 있음
- 목적에 따라 컨슈머 그룹을 분리할 수 있음
- ex) Elasticsearch(로그 실시간 확인용, 시간순 정렬), Hadoop(대용량 데이터 적재, 이전 데이터 확인용)
Java에서 Producer 클래스 생성하기
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // 카프카 포트 설정 2개 이상을 권장(하나 에러 시 다른 브로커에 연결될 수 있도록))
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 키 직렬화 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
ProducerRecord record = new ProducerRecord<String, String>("click_log", "login"); // click_log라는 토픽에 login이라는 value 보내기
// ProducerRecord record = new ProducerRecord<String, String>("click_log", 1,"login"); // key값은 1이고, click_log라는 토픽에 login이라는 value 보내기
producer.send(record);
producer.close();
}
}
Java에서 Consumer 클래스 생성하기
public class Consumer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // 카프카 포트 설정 2개 이상을 권장(하나 에러 시 다른 브로커에 연결될 수 있도록))
configs.put("group.id", "click_log_group"); //컨슈머 그룹
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 키 직렬화 설정
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList("click_log")); //어느 토픽을 대상으로 데이터를 가져올지(특정 토픽의 전체 파티션)
// 특정 토픽의 전체 파티션이 아니라 일부 파티션의 데이터만 가져오고 싶다면
//TopicPartition partition0 = new TopicPartition(topicName, 0);
//TopicPartition partition1 = new TopicPartition(topicName, 1);
//consumer.assign(Arrays.asList(partition0, partition1));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(500); //500ms이라는 시간동엔 데이터를 기다리게 됨
for(ConsumerRecords<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
Replication
카프카의 장점 중 하나는 고가용성이다. 사전에 토픽의 파티션을 여러 브로커에 복사해두기 때문에 하나의 브로커 서버에서 장애가 발생해도 해당 토픽의 복사본이 다른 브로커에 있어 데이터 가용성을 유지할 수 있다.
Replication설정은 초기 토픽을 생성할 때 설정할 수 있다.
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic_name --partition 3 --replication-factor 3
replication-factor가 1인 경우 하나의 원본 파티션만 존재한다는 것이고, 2개일 경우 원본 1개, 복사본 1개, 3개일 경우 원본 1개, 복사본 2개가 존재한다. 단 replication의 개수는 브로커 개수를 초과할 수 없다.
Leader-Follower 아키텍처
replication 설정을 하면 원본 파티션과 복사본 파티션이 존재한다. 원본을 담당하고 있는 파티션을 Leader 파티션이라고 부르고, 그 외의 복사본 파티션을 Follower 파티션이라고 부른다. 리더 파티션은 카프카 클라이언트와 데이터를 주고받는 역할을 하고, 팔로워 파티션은 리더 파티션으로부터 레코드를 지속적으로 복제한다. 만약 리더 파티션이 동작이 불가능할 경우 나머지 팔로워 파티션 중 1개가 리더로 선출되어 리더 파티션 역할을 한다.
ISR(In Sync Replica)
특정 파티션의 리더, 팔로워가 레코드가 모두 복제되어 sync가 맞는 상태를 ISR이라고 한다.
Producer로부터 데이터를 전달할 때 데이터의 저장상태를 알기 위해서 ack 값을 사용한다.
- ack가 0인 경우 : Leader partition에 데이터 전송하고, 응답값은 받지 않음 → 파티션에 정상적으로 데이터가 전송되었는지, 나머지 파티션에 정상적으로 복제되었는지 알 수 없음, 보장할 수 없음. 속도는 빠르지만 데이터 유실가능성 있음
- ack가 1인 경우 : Leader Partition에 데이터를 전송하고, 응답값을 받음. 다만 나머지 파티션에 복제되었는지는 알 수 없음 → 리더가 데이터 받은 후 장애가 났다면 나머지 파티션에 데이터 복제가 되었는지 알 수 없음. 이는 데이터 유실 가능성 있음
- ack가 all인 경우 : 응답값 받고, 복제도 잘 이뤄졌는지 확인. 데이터 유실 없지만, 속도가 현저히 느림
Replication의 개수가 많다고 좋은 것은 아니다. 개수가 많아지면, 그만큼 브로커의 리소스 사용량도 늘어나게 되므로 상황에 맞게 고려해봐야 한다.
참고 사이트
Apache Kafka란 - 이론 공부 내용
카프카의 등장배경 아파치 카프카(Apache Kafka)는 미국의 대표적인 비즈니스 인맥 소셜네트워크 링크드인(LinkedIn)에서 처음 개발된 분산 메시징 시스템이다. 현재 데이터 파이프라인을 구축할 때
jackcokebb.tistory.com
'빅데이터/Kafka' 카테고리의 글 목록
life is short
blog.voidmainvoid.net