코딩마을방범대

[ELK] ELK에서 Logstash.conf 파일의 filter, input, output 블록 본문

💡 백엔드/ELK

[ELK] ELK에서 Logstash.conf 파일의 filter, input, output 블록

신짱구 5세 2024. 11. 4. 11:13
728x90

 

 

 

Logstash를 통해 로그들에 옵션을 적용하여 ELK에 새로운 분류를 제공할 수 있다.

ELK 사용 시 Logstash에서 적용할 수 있는 옵션들을 설명해볼 것이다.

각 옵션들에 대한 설명과 예시를 보여줄 예정이다.

 

 

 

filter 블록

 

1. Mutate Filter

입력 이벤트를 수정하는 데 사용된다.

필터명 설명 예시
add_field 새로운 필드 추가 add_field => { "new_field" => "value" }
remove_field 특정 필드 삭제 remove_field => ["old_field"]
rename 필드의 이름을 변경 rename => { "old_name" => "new_name" }
replace 필드의 값을 새로운 값으로 대체 replace => { "status" => "success" }
update 필드의 값을 업데이트 update => { "tags" => "processed" }
filter {
    mutate {
        add_field => { "new_field" => "value" }
        remove_field => ["old_field"]
        rename => { "old_name" => "new_name" }
    }
}

 

 


 

 

2. Ruby Filter

  • Ruby 코드를 사용하여 이벤트를 처리할 수 있게 함.
  • 복잡한 로직이나 계산이 필요한 경우 유용함.
filter {
    ruby {
        code => "
            event.set('new_field', event.get('field1') + event.get('field2'))
        "
    }
}

 


 

 

3. Grok Filter

  • 비정형 로그 데이터를 정형화하는 데 사용함.
  • 정규 표현식을 사용하여 문자열을 파싱하고 필드를 추출 후 저장함
filter {
    grok {
        match => { "path" => ".*/API\.(?<log_date>\d{4}-\d{2}-\d{2})\.log\.\d+" }
        add_field => { "new_field" => "값" }
    }
}
mutate의 add_field와 grok의 add_field의 차이점은?

grok의 add_filed는 match의 정규표현식이 일치할 경우에만 실행된다.

 

 


 

 

4. Date Filter

문자열 형식의 날짜를 타임스탬프 형식으로 변환한다.

필터명 설명 예시
match 변환할 날짜 형식을 지정 match => ["timestamp_field", "ISO8601"]
target 변환될 날짜를 저장 target => "@timestamp"
filter {
    date {
        match => ["timestamp_field", "ISO8601"]
        target => "@timestamp"
    }
}

 

 


 

 

5. Split Filter

배열 형태의 필드를 여러 개의 이벤트로 분리한다.

(array_field 에는 json 데이터 등이 들어갈 수 있음)

filter {
    split {
        field => "array_field"
    }
}

 


 

 

6. GeoIP Filter

IP 주소를 기반으로 지리적 위치 정보를 추가한다. (주로 웹 로그 분석에 사용)

필터명 설명 예시
source IP 주소가 저장 source => "client_ip"
target 위치 정보를 저장 target => "geoip"
filter {
    geoip {
        source => "client_ip"
        target => "geoip"
    }
}

 


 

 

7. Drop Filter

특정 이벤트를 삭제하는 데 사용된다.

filter {
    if [status] == "404" {
        drop { }
    }
}



 

 


 

 

 

 

 

 

 

input 블록

 

1. Beats Input

Beats 프로토콜을 통해 데이터를 수신

input {
  beats {
    port => 5044
  }
}

 


 

2. TCP Input

TCP 소켓을 통해 데이터를 수신

input {
  tcp {
    port => 50000
    codec => "json_lines"  # 수신 데이터의 포맷을 지정
  }
}

 


 

3. UDP Input

UDP 소켓을 통해 데이터를 수신

input {
  udp {
    port => 50000
    codec => "json_lines"
  }
}

 


 

4. File Input

로컬 파일에서 로그를 읽어오는 데 사용

input {
  file {
    path => "/path/to/logs/*.log"
    start_position => "beginning"  # 파일의 처음부터 읽기
    sincedb_path => "/dev/null"     # 항상 처음부터 읽도록 설정
  }
}

 


 

5. HTTP Input

HTTP 요청을 통해 데이터를 수신

input {
  http {
    port => 5044
  }
}

 


 

6. Kafka Input

Apache Kafka에서 메시지를 수신

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["your_topic"]
  }
}

 


 

7. JDBC Input

JDBC를 사용하여 데이터베이스에서 데이터를 가져옴

input {
  jdbc {
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "cohttp://m.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_db"
    jdbc_user => "username"
    jdbc_password => "password"
    statement => "SELECT * FROM your_table"
  }
}

 


 

8. S3 Input

AWS S3에서 데이터를 읽어오는 데 사용

input {
  s3 {
    bucket => "your_bucket_name"
    prefix => "logs/"
    region => "us-west-2"
  }
}

 


 

9. Stdin Input

표준 입력을 통해 데이터를 수신 (주로 테스트나 디버깅에서 사용)

input {
  stdin {}
}

 


 

10. Redis Input

Redis에서 데이터를 읽어옴

input {
  redis {
    host => "localhost"
    port => 6379
    data_type => "list"
    key => "your_list_key"
  }
}

 

 

 

 

 

 


 

 

 

 

 

 

 

output 블록

 

1. File Output

로그 데이터를 파일로 저장

output {
  file {
    path => "/path/to/output/logfile.log"
    codec => "json_lines"  # JSON 형식으로 저장
  }
}

 

 



2. Stdout Output

콘솔에 로그 출력 (주로 디버깅에 유용)

output {
  stdout {
    codec => rubydebug  # 사람이 읽기 쉬운 형태로 출력
  }
}

 



3. Kafka Output

Apache Kafka로 로그 데이터를 전송 (분산 시스템에서 메시지 처리할 때 유용)

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "your_topic"
  }
}

 



4. RabbitMQ Output

RabbitMQ 메시지 큐로 로그 데이터를 전송

output {
  rabbitmq {
    exchange => "logs"
    exchange_type => "direct"
    key => "log_key"
    host => "localhost"
  }
}

 



5. HTTP Output

HTTP POST 요청을 통해 외부 API에 로그 데이터를 전송

output {
  http {
    url => "http://your.api.endpoint"
    http_method => "post"
    format => "json"
  }
}

 



6. S3 Output

AWS S3에 로그 데이터를 저장

output {
  s3 {
    bucket => "your-bucket-name"
    region => "us-west-2"
    prefix => "logs/"
  }
}

 

 



7. Elasticsearch Snapshot Output

Elasticsearch의 스냅샷을 생성할 수 있음

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    action => "snapshot"
  }
}





8. Email Output

로그 데이터를 이메일로 보낼 수 있음

output {
  email {
    to => "recipient@example.com"
    subject => "Log Alert"
    body => "Log content: %{message}"
  }
}





9. InfluxDB Output

InfluxDB에 로그 데이터를 전송할 수 있음

output {
  influxdb {
    host => "localhost"
    db => "your_db"
    measurement => "logs"
  }
}



 

 

 

 

 


 

 

 

 

 

 

 

이 외 블록

 

1. Environment Variables

환경 변수를 설정할 수 있는 블록

environment {
  ENV_VAR_NAME = "value"
}

 

 


 

2. Conditional Statements

각 블록 내에서 조건문을 사용하여 특정 조건에 따라 다른 설정을 적용할 수 있음.

(이벤트의 필드 값에 따라 다르게 처리하고 싶을 때 유용)

filter {
  if [type] == "error" {
    # 오류 로그에 대한 필터
  } else {
    # 일반 로그에 대한 필터
  }
}

 

 

 

 

 

728x90