Notice
Recent Posts
Recent Comments
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Archives
Today
Total
관리 메뉴

오픈베이스 연구소

네트워크 보안 엔지니어를 위한 엘라스틱 강좌 - 2강 본문

Elastic Stack 배우기

네트워크 보안 엔지니어를 위한 엘라스틱 강좌 - 2강

godsman 2018. 1. 29. 15:29

두번째 시간인 오늘은 Elastic Stack을 설치하고, 사용하는 방법을 배워보려고 합니다.


지난 시간에 과제를 통해 Elastic Stack 사용 환경을 구축하고, Elasticsearch, Kibana, Metricbeat 설치 및 사용방법은 이전 포스팅을 참고해보세요. 

Elastic Stack 설치 및 사용


지난 시간 과제에 이어 오늘은 Logstash를 설치하고 활용하여 데이터를 직접넣어보려고 합니다.


Elastic Stack 실행 - logstash


1. logstash구조 설명
압축을 푼 logstash는 다음과 같다
logstash-6.1.2
 
ㄴ bin                                  실행파일들이 들어있다.
 ㄴ config                              logstash 설정 파일들이 들어있다.
 ㄴ logstash-core-plugin-api      plugin들이 들어있다.
 ……

2. logstash config 설정
- logstash는 실행할 때 설정이 필요합니다.
- 설정방법은 세가지가 있습니다.
- command line 입력할 때 함께 지정, 파일로 만들어서 지정, 설정파일 경로를 지정 

3. logstash 실행, command line으로 설정
- cmd 창에서 logstash 폴더(C:\elasticstack\logstash-6.1.2\bin\)로 이동합니다.
- 문자열를 입력하면, 시간정보와 호스트정보를 추가하고 입력한 문자열을 변환없이 화면에 출력합니다.
- Hi Logstash 입력, 2018-01-29T06:24:32.792Z KANG Hi Logstash 출력
C:\elasticstack\logstash-6.1.2\bin>logstash -e 'input{ stdin{ } } output { stdout{ } }'
... 생략

[2018-01-29T15:24:05,495][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2018-01-29T15:24:05,584][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

Hi Logstash
2018-01-29T06:24:32.792Z KANG Hi Logstash

    

    - logstash -e 명령어는 logstash 설정을 command line에서 입력받습니다.

    - 아래에서 설정파일을 지정하거나, 설정파일 위치를 지정하여 실행하는 방법을 설명합니다.

    - 주의, 설정파일 위치를 지정한 상태에서는 logstash -e 명령어는 동작하지 않습니다.


Logstash 실행 구조


Logstash 는 데이터 처리 파이프라인으로, 다양한 종류의 데이터 소스를 가공하여 출력할 수 있습니다.





Logstash는 conf 파일을 기반으로 동작하고, 기본적으로  input, filter, output 으로 구성되어 있습니다. 


Input 에서는 beats, jdbc, syslog, tcp, udp, file, stdin 등을 통해 데이터소스를 입력받고,

Filter에서는 입력받은 데이터를 원하는 형태로 가공가능하며,

Output에서는 가공한 데이터를 elasticsearch, jdbc, file, stdout 등 원하는 곳으로 출력할 수 있습니다. 




conf 파일을 만들어 어떻게 입출력 처리가 되는지 살펴봅시다. 


먼저, C:\elasticstack\logstash-6.1.2 경로에 conf.d 폴더를 만듭니다.

conf.d 폴더에 my_first_logstash.conf 를 아래 내용으로 만들어줍니다. 

my_first_logstash.conf


 

  input {

         stdin {

                codec => json

         }

  }

  filter {

         mutate {

                add_field => { "new_field" => "filter function" }

         }

  }

  output {

         stdout {

              codec => rubydebug

         }

  }



아래와 같이 실행해보도록 합시다. 설정파일 위치를 지정하여 실행하는 방법입니다.

    - logstash -f 명령을 이용합니다.

    - json 형태로 입력받아서 해석한 내용을 화면으로 출력합니다.

    - json 형태를 잘 모를 수 있으니, {"field1":"abc", "field2": 123} 복사하거나 입력한 후에 엔터를 쳐보세요.


C:\elasticstack\logstash-6.1.2\bin>logstash -f C:\elasticstack\logstash-6.1.2\conf.d\my_first_logstash.conf 
... 생략

[2018-01-29T17:34:13,726][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2018-01-29T17:34:13,791][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

{"field1":"abc", "field2": 123}
{
        "field1" => "abc",
      "@version" => "1",
        "field2" => 123,
     "new_field" => "filter function",
    "@timestamp" => 2018-01-29T08:34:52.740Z,
          "host" => "KANG"
}
test
[2018-01-29T17:43:51,915][WARN ][logstash.codecs.jsonlines] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unrecognized token 'test': was expecting 'null', 'true', 'false' or NaN
"; line: 1, column: 5]>, :data=>"test\r"}
{
    "@timestamp" => 2018-01-29T08:43:51.925Z,
          "host" => "KANG",
       "message" => "test\r",
     "new_field" => "filter function",
      "@version" => "1",
          "tags" => [
        [0] "_jsonparsefailure"
    ]
}

    

logstash는 입력한 정보를 분석(parsing)합니다.

    - "field1"의 값은 입력한 대로 "abc" 입니다.

    - "@version"은 첫 번째 입력(생성)이라는 의미로 "1" 입니다.

    - "field2"의 값은 입력한 대로 123 입니다.

    - "new_field"는 logstash 설정파일을 통해서 "filter function" 값을 추가했습니다.

    - "@timestamp"는 입력된 시간입니다. 

    - "host"에 입력한 호스트 이름을 기록합니다


Input 에서 stdin 플러그인은 입력을 이벤트로 받는 플러그인 입니다. 입력데이터의 코덱을 옵션으로 정의할 수도 있습니다. 코텍으로 json 을 정의 하였기 입력으로 json 포맷이 아닌 데이터를 입력하면 tags에 '_jsonparsefailure' 이 추가 되어 출력됩니다.

Filter 에서 mutate 플러그인은 입력된 데이터의 필드들을 rename, remove, replace, modify 등 일반적인 작업을 수행할 수 있습니다. 위 예제 에서는 add_field 옵션을 이용해 new_field를 이름으로 가지는 필드를 추가해보았습니다.

Output 에서는 stdout 플러그인을 사용하면 콘솔창에 출력되는 데이터를 바로 볼 수 있어 디버깅을 할 때 많이 사용됩니다. 


위 예제에서 처럼 실행할 conf 파일을 -f 옵션을 통해 파일 하나를 지정하여 실행할 수 있지만, 일반적으로는 conf 파일들을 모아놓은 폴더를 지정해놓고 logstash을 실행합니다. C:\elasticstack\logstash-6.1.2\config\logstash.yml 에서 path.config 세팅을 통해 기본 경로를 설정 할 수 있습니다. 


기본 경로가 정의되면 C:\elasticstack\logstash-6.1.2\bin\logstash.bat 을 더블 클릭만으로 실행할 수 있습니다.

path.config를 설정하면, 위에서 실행한 방법, logstash -e 로 실행할 수 없습니다. 



     # logstash.yml

     # ------------ Pipeline Configuration Settings --------------

     #

     # Where to fetch the pipeline configuration for the main pipeline

     #

     path.config: C:\elasticstack\logstash-6.1.2\conf.d

     #



Logstash 를 통해 JSON 파일 데이터 넣기


Logstash 동작 원리를 익혔다면 이제는 실제 데이터를 넣어보도록하겠습니다.


Logstash를 이용해 elasticsearch에 데이터 넣기


ADCsmart 데이터를 logstash를 활용해 elasticsearch에 넣어보도록 하겠습니다.

ADCsmart는 ADC의 여러가지 정보를 저장하고 있습니다. 데이터를 이용하면 ADCsmart에서 제공하지 않는 화면을 직접 만들어서 분석할 수 있습니다. 유용한 분석방법이 있으면 알려주세요. ADCsmart에 추가하겠습니다. 


1. 데이터 파일 다운로드

    - adcsmart-data.zip 을  C:\elasticstack\adcsmart 아래에 압축을 풉니다.


2. conf 파일 저장

    - C:\elasticstack\logstash-6.1.2\conf.d 폴더에 adcsmart_insert.conf파일을 저장합니다.



     input {

         file {

             path => "C:\elasticstack\adcsmart\adcsmart-*.json"

             start_position => "beginning"

             codec => "json"

         }

     }

     output {

         if [type] == "error"{

           elasticsearch {

                 index => "adcsmart-monitor-error-%{+YYYY.MM.dd}"

                 hosts => ["localhost:9200"]

             }

          } else if [type] == "interface"{

              elasticsearch {

                  index => "adcsmart-monitor-interface-%{+YYYY.MM.dd}"

                  hosts => ["localhost:9200"]

              }

          } else if [type] == "bps"{

               elasticsearch {

                   index => "adcsmart-service-bps-%{+YYYY.MM.dd}"

                   hosts => ["localhost:9200"]

               }

          }

     }




3. logstash 실행

  - C:\elasticstack\logstash-6.1.2\bin\logstash.bat 을 더블 클릭만으로 실행시에는 C:\elasticstack\logstash-6.1.2\conf.d 아래에 있는 모든 conf 파일을 실행하기 때문에 실행하고자 하는 conf 파일만 두도록 합니다.

  - 하나의 conf파일만 지정해서 실행하고 싶다면 아래와 같이 '-f' 옵션을 사용할 수도 있습니다.



     C:\elasticstack\logstash-6.1.2\bin>logstash -f C:\elasticstack\logstash-6.1.2\conf.d\adcsmart_insert.conf 




Kibana 에서 elasticsearch 데이터 조회하기_1 : index 등록하기


데이터가 elasticsearch에 잘 들어갔는지 확인해볼까요?

Kibana > Dev_tools 에서 왼쪽 콘솔창에 명령어를 통해서 elasticsearch에 저장된 데이터를 볼 수 있습니다.


      

     # elasticsearch에 저장된 모든 인덱스를 조회

     GET _cat/indices?v


     # elasticsearch에 저장된 인덱스 중 adcsmart-* 패턴에 해당하는 인덱스만 조회

     GET _cat/indices/adcsmart-*?v




adcsmart- 로 시작하는 인덱스가 생성되었으면 키바나를 이용하여 데이터를 분석합니다.


저장한 데이터를 Kibana에서 보기 위해서는 '인덱스 패턴등록'이 필요한데요. 방법은 아래와 같습니다.

1. Kibana > Management > Index Patterns < Create Index Pattern 버튼을 선택합니다.

2. (Step 1 of 2) Kibana에서 조회할 인덱스의 인덱스 패턴을 등록합니다.

   - 일반적으로 인덱스 뒤에 일별, 월별 등으로 구분하기 때문에 * 을 활용하여 등록합니다. 

   - Kibana에서 adcsmart-service-bps-* 패턴을 등록하면 elasticsearch 에 저장된 adcsmart-bps-2018.01.30 인덱스, adcsmart-bps-2018.01.31 인덱스 등의 인덱스 데이터를 조회할 수 있습니다. 



3. (Step 2 of 2) 해당 인덱스 패턴에서 Time Filter Field name을 'occur_time' 으로 지정합니다.

   - @timestamp 필드는 logstash를 통해 elasticsearch로 저장된 시간을 의미합니다.





Kibana 에서 elasticsearch 데이터 조회하기_2 : visualize 및 dashboard 등록하기


인덱스패턴 등록 후에는 Kibana에서 visualize를 만들고 visualize들을 구성하여 dashboard를 만들 수 있습니다.

만들어진 visualize와 dashboard는 export, import 를 통해 공유할 수 있습니다. 

자유롭게 다양한 visualize를 만들어보시길 권해드리며, 제가 만든 sample visualize와 dashboard를 import 하는 방법을 알려드리도록 하겠습니다.


1. 먼저 이미 설명한 adcsmart-service-bps-* index 패턴을 등록하는 방법으로 아래의 index 패턴을 모두 등록합니다.

  - adcsmart-monitor-interface-*

  - adcsmart-monitor-cpumem-*

  - adcsmart-monitor-adc-traffic-*

  - adcsmart-monitor-drop-*

  - adcsmart-monitor-error-*

  - adcsmart-monitor-httprequest-*

  - adcsmart-service-responsetime-*

  - adcsmart-service-bps-* (위에서 생성, skip)


2.  ADCsmart-dashboard.jsonADCsmart-visualizetions.json파일을 다운받습니다.

    - 위에서 등록한 인덱스패턴을 이용해 만든 visualize와 dashboard를 각각 export한 파일입니다.


3-1. Kibana > Management > Saved Objects 페이지에서 위에서 다운받은 'ADCsmart-visualizetions.json' 파일을 선택하고 Import 합니다.

    - Index Pattern Conflicts 창이 뜨면 다음과 같이 앞서 등록해둔 index 패턴을 선택해 줍니다.




3-2. Kibana > Management > Saved Objects 페이지에서 위에서 다운받은 'ADCsmart-dashboard.json' 파일을 

선택하고 Import 합니다.


4. Import 가 완료되었다면, Kibana > Dashboard 에서 '[ADCsmart]Monitoring' 이름의 dashboard가 등록된 것을  확인할 수 있습니다.




5. '[ADCsmart]Monitoring' dashboard를 선택하면 다음과 같은 내용을 볼 수 있습니다. 

  - 더 다양한 visualize와 dashboard를 만들어 추가해보셔도 좋습니다.






[과제] Logstash를 활용하여 파일추출 결과를 elasticsearch로 저장 및 분석


Logstash를 이용해 데이터를 elasticsearch로 직접 넣고, Kibana로 분석해봅시다!


순서는 다음과 같이 진행하시면 됩니다.

파이썬 스크립트를 통해 파일추출정보를 json 형태의 파일로 만들고,

logstash는 json으로 만들어진 파일추출정보를 읽어 elasticsearch로 저장하고,

kibana에서 저장된 데이터를 분석해보시면 됩니다. 


환경구성


1. Elastic Stack 설치 및 실행


파일추출


1. file_extract_distribute.zip을 다운받아 압축을 풉니다. (C:\elasticstack\file_extract_distribute 폴더)

2. cmd 창에서 C:\elasticstack\file_extract_distribute 경로로 이동합니다.

3. walk_test_date_modi.exe 실행파일을 실행합니다.

    - '-d' 옵션으로 파일추출을 할 대상폴더를 지정하고, '-o'  옵션으로 파일추출정보를 적을 파일을 지정합니다.

    - 파일의 모든 데이터를 읽어서 hash로 만드는 프로그램입니다.

    - PC 전체 파일을 분석하려면 "C:\"를 입력하면 됩니다.

    - 현재 분석하고 hash를 계산하는 부분이 최적화되어 있지 않아서 시간이 오래 걸릴 수 있습니다.

    - 재미있는 분석을 위해서 문서폴더를 추천합니다. 예를들면, "C:\데이터" 


     C:\elasticstack\file_extract_distribute>walk_test_date_modi.exe -d "C:\데이터" -o "C:\elasticstack\file_extract_distribute\file_extract_result.json"

     ........

     (502/637) :: _createMathOperation.js 



3. 파일추출결과파일을 확인합니다. (C:\elasticstack\file_extract_distribute\file_extract_result.json)

  - 파일추출 대상 폴더 안의 파일 건수만큼 json 로그가 생성됨을 확인 할수 있습니다.

  - json의 의미는 다음과 같습니다.



    {

      "path": "C:\\elasticstack\\elasticsearch-6.1.1",         // 파일경로

      "fname": "6.1.1data.zip",                                        // 파일이름   

      "size": 1446281832,                                              // 파일크기

      "ctime": "2018-01-23 15:05:33",                               // 파일 생성 시간

      "mtime": "2018-01-11 12:28:11",                              // 파일 수정 시간

      "atime": "2018-01-23 15:05:33",                               // 파일 마지막 접근 시간

      "hash": {

        "md5": "99d21dbefa6b1e06d50d68864768fe8d",

        "sha256": "81809f64869e1f9e096196789e7e431d5a44e7304956929d43e23e798ec3ccf1"

      }

    }



Logstash 실행 하기


1. conf 파일 만들기

  - input, output 으로 구성된 conf 파일을 만들어보도록 합시다.

  - input : 앞서 추출한 file_extract_result.json 파일을 데이터 입력으로 사용합니다. 

  - output : 로컬 elasticsearch로 저장합니다.


 

     input {

         file {

             path => "C:\elasticstack\file_extract_distribute\file_extract_result.json"

             start_position => "beginning"

             codec => "json"

         }

     }

      filter {

          if [fname] =~ /\.(\w+)$/ {

              ruby {

                  code => '

                      fname = event.get("[fname]")

                      if fname =~ /\.(\w+)$/

                          event.set("[extension]", $1)

                      end

                  '

              }

          }

     }

     output {

         elasticsearch {

             index => "file-extract-%{+YYYY.MM.dd}"

             hosts => ["localhost:9200"]

         }

     }

     


Kibana 에서 분석하기


Kibana에서 Visualize와 Dashboard를 구성해 보세요.

    - 전체 파일 개수, 중복되지 않은 파일이름 개수, 중복되지 않은 확장자 개수, 중복되지 않은 파일 개수

    - 파일 생성 날짜 기준 카운터

    - 중복 확장자 비율, 중복 파일명 비율, 중복 파일 비율




참고 페이지


1. Logstash - stdin input plugin

2. Logstash - mutate filter plugin

3. Logstash - stdout output plugin

4. Logstash Directory Layout











Comments