코딩마을방범대
[ELK] ELK에서 Logstash.conf 파일의 filter, input, output 블록 본문
Logstash를 통해 로그들에 옵션을 적용하여 ELK에 새로운 분류를 제공할 수 있다.
ELK 사용 시 Logstash에서 적용할 수 있는 옵션들을 설명해볼 것이다.
각 옵션들에 대한 설명과 예시를 보여줄 예정이다.
filter 블록
1. Mutate Filter
입력 이벤트를 수정하는 데 사용된다.
필터명 | 설명 | 예시 |
add_field | 새로운 필드 추가 | add_field => { "new_field" => "value" } |
remove_field | 특정 필드 삭제 | remove_field => ["old_field"] |
rename | 필드의 이름을 변경 | rename => { "old_name" => "new_name" } |
replace | 필드의 값을 새로운 값으로 대체 | replace => { "status" => "success" } |
update | 필드의 값을 업데이트 | update => { "tags" => "processed" } |
filter {
mutate {
add_field => { "new_field" => "value" }
remove_field => ["old_field"]
rename => { "old_name" => "new_name" }
}
}
2. Ruby Filter
- Ruby 코드를 사용하여 이벤트를 처리할 수 있게 함.
- 복잡한 로직이나 계산이 필요한 경우 유용함.
filter {
ruby {
code => "
event.set('new_field', event.get('field1') + event.get('field2'))
"
}
}
3. Grok Filter
- 비정형 로그 데이터를 정형화하는 데 사용함.
- 정규 표현식을 사용하여 문자열을 파싱하고 필드를 추출 후 저장함
filter {
grok {
match => { "path" => ".*/API\.(?<log_date>\d{4}-\d{2}-\d{2})\.log\.\d+" }
add_field => { "new_field" => "값" }
}
}
mutate의 add_field와 grok의 add_field의 차이점은?
grok의 add_filed는 match의 정규표현식이 일치할 경우에만 실행된다.
4. Date Filter
문자열 형식의 날짜를 타임스탬프 형식으로 변환한다.
필터명 | 설명 | 예시 |
match | 변환할 날짜 형식을 지정 | match => ["timestamp_field", "ISO8601"] |
target | 변환될 날짜를 저장 | target => "@timestamp" |
filter {
date {
match => ["timestamp_field", "ISO8601"]
target => "@timestamp"
}
}
5. Split Filter
배열 형태의 필드를 여러 개의 이벤트로 분리한다.
(array_field 에는 json 데이터 등이 들어갈 수 있음)
filter {
split {
field => "array_field"
}
}
6. GeoIP Filter
IP 주소를 기반으로 지리적 위치 정보를 추가한다. (주로 웹 로그 분석에 사용)
필터명 | 설명 | 예시 |
source | IP 주소가 저장 | source => "client_ip" |
target | 위치 정보를 저장 | target => "geoip" |
filter {
geoip {
source => "client_ip"
target => "geoip"
}
}
7. Drop Filter
특정 이벤트를 삭제하는 데 사용된다.
filter {
if [status] == "404" {
drop { }
}
}
input 블록
1. Beats Input
Beats 프로토콜을 통해 데이터를 수신
input {
beats {
port => 5044
}
}
2. TCP Input
TCP 소켓을 통해 데이터를 수신
input {
tcp {
port => 50000
codec => "json_lines" # 수신 데이터의 포맷을 지정
}
}
3. UDP Input
UDP 소켓을 통해 데이터를 수신
input {
udp {
port => 50000
codec => "json_lines"
}
}
4. File Input
로컬 파일에서 로그를 읽어오는 데 사용
input {
file {
path => "/path/to/logs/*.log"
start_position => "beginning" # 파일의 처음부터 읽기
sincedb_path => "/dev/null" # 항상 처음부터 읽도록 설정
}
}
5. HTTP Input
HTTP 요청을 통해 데이터를 수신
input {
http {
port => 5044
}
}
6. Kafka Input
Apache Kafka에서 메시지를 수신
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["your_topic"]
}
}
7. JDBC Input
JDBC를 사용하여 데이터베이스에서 데이터를 가져옴
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java.jar"
jdbc_driver_class => "cohttp://m.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/your_db"
jdbc_user => "username"
jdbc_password => "password"
statement => "SELECT * FROM your_table"
}
}
8. S3 Input
AWS S3에서 데이터를 읽어오는 데 사용
input {
s3 {
bucket => "your_bucket_name"
prefix => "logs/"
region => "us-west-2"
}
}
9. Stdin Input
표준 입력을 통해 데이터를 수신 (주로 테스트나 디버깅에서 사용)
input {
stdin {}
}
10. Redis Input
Redis에서 데이터를 읽어옴
input {
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "your_list_key"
}
}
output 블록
1. File Output
로그 데이터를 파일로 저장
output {
file {
path => "/path/to/output/logfile.log"
codec => "json_lines" # JSON 형식으로 저장
}
}
2. Stdout Output
콘솔에 로그 출력 (주로 디버깅에 유용)
output {
stdout {
codec => rubydebug # 사람이 읽기 쉬운 형태로 출력
}
}
3. Kafka Output
Apache Kafka로 로그 데이터를 전송 (분산 시스템에서 메시지 처리할 때 유용)
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "your_topic"
}
}
4. RabbitMQ Output
RabbitMQ 메시지 큐로 로그 데이터를 전송
output {
rabbitmq {
exchange => "logs"
exchange_type => "direct"
key => "log_key"
host => "localhost"
}
}
5. HTTP Output
HTTP POST 요청을 통해 외부 API에 로그 데이터를 전송
output {
http {
url => "http://your.api.endpoint"
http_method => "post"
format => "json"
}
}
6. S3 Output
AWS S3에 로그 데이터를 저장
output {
s3 {
bucket => "your-bucket-name"
region => "us-west-2"
prefix => "logs/"
}
}
7. Elasticsearch Snapshot Output
Elasticsearch의 스냅샷을 생성할 수 있음
output {
elasticsearch {
hosts => ["http://localhost:9200"]
action => "snapshot"
}
}
8. Email Output
로그 데이터를 이메일로 보낼 수 있음
output {
email {
to => "recipient@example.com"
subject => "Log Alert"
body => "Log content: %{message}"
}
}
9. InfluxDB Output
InfluxDB에 로그 데이터를 전송할 수 있음
output {
influxdb {
host => "localhost"
db => "your_db"
measurement => "logs"
}
}
이 외 블록
1. Environment Variables
환경 변수를 설정할 수 있는 블록
environment {
ENV_VAR_NAME = "value"
}
2. Conditional Statements
각 블록 내에서 조건문을 사용하여 특정 조건에 따라 다른 설정을 적용할 수 있음.
(이벤트의 필드 값에 따라 다르게 처리하고 싶을 때 유용)
filter {
if [type] == "error" {
# 오류 로그에 대한 필터
} else {
# 일반 로그에 대한 필터
}
}
'💡 백엔드 > ELK' 카테고리의 다른 글
[ELK] Docker를 통해 구축한 ELK 의 데이터를 스냅샷으로 저장하기 (0) | 2024.11.06 |
---|---|
[ELK] Docker로 ELK 스택 구축 시 실행 때마다 자동으로 사용자 설정하기 (0) | 2024.11.04 |
[ELK] ELK 역할 부여 및 권한 관리하는 방법 (0) | 2024.10.31 |
[ELK] ELK를 이용한 MySQL 로그 연동 방법 (0) | 2024.10.21 |
[ELK] 도커를 통해 ELK 구축한 후 Spring boot 로그와 연결하기 (0) | 2024.10.14 |