코딩마을방범대

[ELK] Kafka와 ELK 연동하기 본문

💡 백엔드/ELK

[ELK] Kafka와 ELK 연동하기

신짱구 5세 2024. 11. 19. 14:44
728x90

 

 

 

ELK 기초 구축은 아래 포스트를 참고하면 된다.

테스트 환경은 아래와 같다.

 

A서버: ELK 구축

B서버: 로그가 쌓임, Kafka를 이용해 로그를 C서버로 보냄

C서버: Kafka 브로커

 

 

[ELK] 도커를 통해 ELK 구축한 후 Spring boot 로그와 연결하기

ELK란 아래 포스트를 통해 개념을 확인할 수 있다. ELK 스택(Elasticsearch, Kibana, Beats, Logstash) 개념 정리ELK 스택Elasticsearch, Logstash, Kibana의 세 가지 인기 있는 프로젝트로 구성된 스택을 의미

sweet-rain-kim.tistory.com

 

 

 

 


 

 

 

 

 

 

 

카프카에서는 분산 시스템을 이용하기 위해 Zookeeper 혹은 KRaft를 함께 사용한다.

이전 버전에서는 Zookeeper 를 보편적으로 사용하였으나, 성능 면의 문제점이 많아 KRaft와 혼용하여 사용하는 추세이다.

실제로 Kafka에서도 3.6.0 버전에서 KRaft로의 마이그레이션을 지원해주기도 하고, 4.0.0 버전은 KRaft에서만 사용 가능하다.

 

Zookeeper의 기능과 KRraft와의 차이를 알고싶다면 아래 포스트를 참고하면 된다.

필자의 포스트에선 KRaft 모드를 사용하는 Kafka 로 진행 예정이다.

 

 

Zookeeper의 개념과 Kafka와의 상관관계

KRaft, Kafka, Zookeeper는 모두 분산 시스템에서 메시지 큐잉과 데이터 관리를 위해 사용되는 기술이다.기본적으로 사용할 때 Kafka와 Zookeeper를 같이 사용하는데, KRaft 사용 시 Zookeeper를 사용하지 않아

sweet-rain-kim.tistory.com

 

KRaft의 개념과 Kafka와의 상관관계

주키퍼는 카프카와 보편적으로 함께 사용되는 서비스이다.주키퍼와 카프카의 개념을 잘 모르겠다면 아래 포스트를 참고하면 된다.하지만, 주키퍼와 카프카를 같이 사용할 때 문제점들이 발생

sweet-rain-kim.tistory.com

 


 

 

일단 Kafka는 두 서버 모두 설치해주어야 한다.

이후 토픽은 브로커 서버인 C서버에서만 생성해주면 되며, B서버가 메시지를 보낼 때 C서버에서 만든 해당 토픽으로 발송하면 된다.

 

Kafka 설치 및 기초 세팅하기 - A,C서버

 

1. Kafka 다운로드

 

Index of /kafka/3.9.0

 

downloads.apache.org

 

목록이 쫘르륵 나오는데 여기서 현재 작성일 기준으로는 'kafka_2.13-3.9.0.tgz'를 다운로드 하면된다.

Kafka는 Java로 작성되어 있으므로, 설치될 서버는 Java가 설치되어 있어야 한다.

2.13는 Scala 버전이며, 2.13 등이 써있지않고 kafka 버전만 적혀있는 경우 Scala는 자동으로 3.0으로 적용된다.
Apache 공식 홈페이지를 확인해보면 2.13을 추천한다고 되어있다.

 

필자의 경우 ubuntu에서 진행 예정이므로 wget을 이용해 다운로드 해줄 것이다.

위 사이트에서 다운로드할 항목을 확인하여 마우스 우클릭 후 링크 주소 복사하여 wget으로 다운로드 받아준다.

wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz

 

 


 

 

2. Kafka 압축 해제

tar -xzf kafka_2.13-3.9.0.tgz
cd kafka_2.13-3.9.0

 

 


 

 

3. 설정 파일 수정 - C서버만

※ zookeeper 사용 시 config/server.properties 파일을 수정해주면 된다.

vi ./config/kraft/server.properties
# Kafka 브로커가 수신할 주소
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

# 클라이언트가 Kafka 브로커에 연결할 때 사용할 주소
advertised.listeners=PLAINTEXT://C서버 IP주소:9092,CONTROLLER://C서버 IP주소:9093

 

 


 

 

4. Zookeeper 실행하기(KRaft 모드를 사용하지 않을 경우에만)

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

 

 


 

 

5. Kafka 시작

※ Zookeeper 를 사용하려는 경우 4번 실행 후 config/server.properties로 실행해주어야 한다.

nohup bin/kafka-server-start.sh config/kraft/server.properties > kafka.log 2>&1 &
만약 kafka.log 파일에 아래와 같은 오류가 발생했다면, meta.properties 파일을 찾지 못해서 발생하는 오류이다.
server.properties를 보면 log.dirs를 설정해둔 옵션이 있을텐데, 별도의 작업을 하지 않은 이상 이 폴더는 기존에 존재하지 않는다.
이 때문에 kafka 로그 디렉토리를 초기화 한 이후 진행해주어야 하므로, 아래 명령어들을 따라해주면 된다.

[2024-12-02 04:58:00,180] WARN No meta.properties file under dir /tmp/kraft-combined-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2024-12-02 04:58:00,185] ERROR Exiting Kafka due to fatal exception (kafka.Kafka) 
java.lang.RuntimeException: No readable meta.properties files found.
    at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:480)
    at kafka.server.KafkaRaftServer.initializeLogDirs(KafkaRaftServer.scala:144)
    at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:60)
    at kafka.Kafka$.buildServer(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala:90)
# log.dirs 로 설정한 폴더 생성
mkdir /tmp/kraft-combined-logs

# kafka 클러스트 ID 생성(로그 폴더 포맷 시 클러스터 ID가 필요)
./bin/kafka-storage.sh random-uuid

# 로그 폴더 포맷
./bin/kafka-storage.sh format -t [생성한 uuid] -c ./config/kraft/server.properties​

 


 

 

6. Kafka 주제(Topic) 생성 - C서버만

bin/kafka-topics.sh --create --topic your_topic_name --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
옵션명 설명
--topic 생성할 주제의 이름

* 토픽에 사용하는 기호는 일관성을 유지하는 것이 좋다. (-, ., _ 혼용 시 오류 발생 위험)
--bootstrap-server
브로커 주소

* kafka와 브로커가 분리되어 있는 경우 아래의 예시처럼 작성하면 된다.
--bootstrap-server 192.168.1.100:9092, 192.168.200:9092
--partitions 주제의 파티션 수 (default 1)

파티션은 토픽의 메시지를 분산 저장하는 단위로, 여러 파티션을 사용하면 데이터의 병렬 처리가 가능하다.
--replication-factor 복제 계수 (default 1)

* 예를 들어, 복제 인수가 3으로 설정된 경우, 해당 토픽의 각 메시지는 3개의 서로 다른 브로커에 저장된다. 이로 인해 하나의 브로커가 다운되더라도 다른 두 브로커에서 데이터를 복구할 수 있다.
* 복제 인수는 토픽을 생성할 때 설정하며, 카프카 클러스터 내에 존재하는 브로커 수보다 작거나 같아야 한다. 예를 들어, 브로커가 3대인 경우 복제 인수는 1, 2, 또는 3으로 설정할 수 있다.
선택 토픽 삭제 명령어
bin/kafka-topics.sh --delete --topic your_topic_name --bootstrap-server localhost:9092

 


 

 

7. Kafka 모든 토픽 확인 - C서버만

토픽이 정상적으로 생성되었는지 확인해준다.

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

 

 

 

 

 


 

 

 

 

 

 

Kafka 클러스터 설정하기 - B서버

 

1. bash 파일 작성하기

#!/bin/bash

# Kafka 브로커 주소 및 토픽 설정
KAFKA_BROKER="C서버 IP주소:9092"
TOPIC="토픽명"

# 로그 파일 경로
LOG_FILES="/path/to/your/logs/*.log"

# 각 로그 파일에 대해 tail -f 실행
if [ "$1" == "start" ]; then

    for LOG_FILE in $LOG_FILES; do
        # 로그 파일의 이름을 기반으로 Kafka 토픽 생성
        TOPIC="$(basename "$LOG_FILE" .log)_logs"

        # 백그라운드에서 tail -f 실행
        sudo tail -f "$LOG_FILE" | bin/kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC --property "parse.key=true" --property "key.separator=|" &
    done
    # 모든 백그라운드 프로세스가 종료될 때까지 대기
    wait
elif [ "$1" == "stop" ]; then
    if [ -f "$PID_FILE" ]; then
        while read PID; do
            kill -9 $PID
        done < "$PID_FILE"
        rm "$PID_FILE" # PID 파일 삭제
    else
        echo "No running processes found."
    fi
else
    echo "Usage: $0 {start|stop}"
fi
만약 노트패드 활용하여 작성 시 편집-줄끝 문자(EOL) 변환에서 Unix로 바꿔줘야함!

 

 


 

 

2. 스크립트 실행하기

chmod +x log_to_kafka.sh
sudo nohup ./log_to_kafka.sh start &
메시지 수신 테스트
bin/kafka-console-consumer.sh --topic your_topic_name --from-beginning --bootstrap-server A서버 IP주소:9092

 

 

 

 


 

 

 

 

 

 

 

Logstash 설정 변경하기 - A서버

 

logstash.conf 파일의 input 박스를 수정해주어야 한다.

기존에는 파일을 읽어와 로그를 수집하였지만, 이젠 kafka로 수신 받은 메시지를 통해 로그를 수집할 것이다.

 

input {
  kafka {
    bootstrap_servers => "C서버 ip 주소:9092"
    topics => ["your_topic_name1","your_topic_name2"]
    group_id => "logstash_group"
  }
}
...
설정명 설명
bootstrap_servers kafka 브로커 서버와 kafka의 포트번호 기입
topics kafka 브로커에서 생성한 토픽명
group_id Logstash가 Kafka에서 메시지를 소비할 때 사용하는 소비자 그룹의 ID를 정의

같은 group_id를 가진 소비자들은 Kafka의 같은 주제를 구독하고, 메시지를 공유한다.
즉, 하나의 메시지는 소비자 그룹 내의 한 소비자에게만 전달된다.
이를 통해 여러 소비자가 동시에 메시지를 처리할 수 있으며, 부하 분산이 이루어질 수 있다.

 

 

 

 

 


 

 

 

 

 

 

 

 

💡 TIPS!

 

1. Zookeeper와 Kafka 서비스로 등록하기 

서비스로 등록해놓지 않을 경우 bin 폴더에 있는 실행 파일을 통해 직접 실행해주어야 한다.

간편한 명령어로 실행해주기 위해 서비스 등록을 해놓으면 편리하다!

 

 

ⓐ Zookeeper 서비스 등록

 

① 서비스 파일 생성하기

sudo vi /etc/systemd/system/zookeeper.service

 


 

 

② 서비스 파일 작성하기

[Unit]
Description=Apache Zookeeper
After=network.target

[Service]
Type=simple
User=your_user  # Zookeeper를 실행할 유저
ExecStart=/path/to/zookeeper/bin/zkServer.sh start /path/to/zookeeper/conf/zoo.cfg
ExecStop=/path/to/zookeeper/bin/zkServer.sh stop /path/to/zookeeper/conf/zoo.cfg
Restart=on-failure

[Install]
WantedBy=multi-user.target

 


 

 

③ 서비스 파일 리로드 및 시작하기

sudo systemctl daemon-reload
sudo systemctl start zookeeper

 


 

 

④ 부팅 시 자동 시작 설정하기

sudo systemctl enable zookeeper

 


 

 

⑤ 상태 확인

sudo systemctl status zookeeper

 

 

 

 

 


 

 

 

 

ⓑ Kafka 서비스 등록

 

① 서비스 파일 생성하기

sudo vi /etc/systemd/system/kafka.service

 


 

 

② 서비스 파일 작성하기

[Unit]
Description=Apache Kafka
After=zookeeper.service

[Service]
Type=simple
User=your_user  # Kafka를 실행할 유저
ExecStart=/path/to/kafka/bin/kafka-server-start.sh /path/to/kafka/config/server.properties
ExecStop=/path/to/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

 


 

 

③ 서비스 파일 리로드 및 시작하기

sudo systemctl daemon-reload
sudo systemctl start kafka

 


 

 

④ 부팅 시 자동 시작 설정하기

sudo systemctl enable kafka

 


 

 

⑤ 상태 확인

sudo systemctl status kafka

 

 

 


 

 

 

 

 

 

 

2. Kafka를 이용해 json 데이터를 보내고 싶다면? 

 

필자의 경우 파일명도 같이 읽어야 하는데, 줄만 전송할 경우 파일명을 받을 수 없어서 인덱스를 분리할 수 없었다.

(물론 topic을 다르게 설정하면 되긴하지만.. 하지만 너무 조잡해질 수 있으니..)

start 박스 부분을 아래와 같이 수정해주면 된다.

 

ESCAPED_LINE=$(echo "$LINE" | sed 's/"/\\"/g') 옵션을 넣어주지 않으면,

메시지 내 큰 따옴표가 이스케이프 처리 되지 않아 JSON 파싱 에러가 발생할 수 있다.

for LOG_FILE in $LOG_FILES; do
    FILE_NAME=$(basename "$LOG_FILE")
    {
        echo "{\"log\": {\"file\": {\"path\": \"$FILE_NAME\"}}, \"message\": \"Initial log message\"}"

        sudo tail -f "$LOG_FILE" | while read LINE; do
            ESCAPED_LINE=$(echo "$LINE" | sed 's/"/\\"/g')

            echo "{\"log\": {\"file\": {\"path\": \"$FILE_NAME\"}}, \"message\": \"$ESCAPED_LINE\"}"
        done
    } | bin/kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC --property "parse.key=false" &
    echo $! >> $PID_FILE
done 
# 모든 백그라운드 프로세스가 종료될 때까지 대기 
wait

 

그럼 이런식으로 메시지를 발송한다.

{"log": {"file": {"path": "bsa_api.log"}}, "message": "[24-11-21 06:11:09][HikariCP connection adder][INFO ][n.s.l.l.s.Slf4jSpyLogDelegator.connectionOpened()] 26499. Connection opened (Slf4jSpyLogDelegator.java:541)"}

 

Logstash의 필터에선 아래와 같이 설정하면 끝!

 

설명

source => "[event][original]" 이 단계를 실행하면 logstash가 json 데이터로 읽어들인 후 변수에 매칭한다.

target을 설정하여 json 데이터를 어떤 변수에 감쌀지 설정한다.

add_field를 통해 원하는 변수를 추가하여 매칭 시켜주고, replace를 통해 기존 message 필드에 매칭시켜준다.

  if [event][original] {
    json {
      source => "[event][original]"
      target => "parsed_message"
    }
  }
  
  mutate {
    add_field => {
      "filename" => "%{[parsed_message][log][file][path]}" 
    }
    replace => { 
      "message" => "%{[parsed_message][message]}"
    }
  }

 

 

 


 

 

 

 

 

 

 

3. Kafka 관련 명령어

 

실행 & 종료 관련

 

zookeeper 실행하기

bin/zookeeper-server-start.sh config/zookeeper.properties

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

 


 

Kafka 실행하기

zookeeper로 실행하는 방법: config/server.properties 활용

KRaft로 실행하는 방법: config/kraft/server.properties 활용

bin/kafka-server-start.sh config/server.properties 

nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

 


 

백그라운드 프로그램 종료시키기

ps aux | grep kafka

sudo kill -9 pid번호

 


 

특정 텍스트가 들어간 프로세스 전부 종료시키기

sudo kill -9 $(ps aux | grep '[k]afka' | awk '{print $2}')

 

 

 


 

 

Kafka 작업 명령어

※ ip 주소나 토픽명은 본인 환경에 맞게 수정해서 테스트하기

 

토픽 생성

bin/kafka-topics.sh --create --topic your_topic_name --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

 


 

토픽 삭제

bin/kafka-topics.sh --delete --topic your_topic_name --bootstrap-server localhost:9092

 


 

토픽 목록 확인

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

 


 

메시지 수신 테스트

bin/kafka-console-consumer.sh --topic your_topic_name --from-beginning --bootstrap-server localhost:9092

 


 

메시지 발신 테스트

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic your_topic_name

 

 

 

 

728x90