Fluentd Use Case
tail 플러그인 활용하기
in_tail 플러그인은 텍스트 파일을 읽어 수집할 수 있는 기본 플러그인이며, 로그파일과 같이 갱신되는 파일을 증분만 계속해서 수집할 수도 있다.
csv 데이터
분석 데이터셋이나 기초 데이터셋과 같이 일회성 또는 배치성으로 수집하는 csv 텍스트 파일을 수집하는 방법이다.
수집 데이터 소스
head -n 5 accidentology.csv
timestamp,Date,Hour,Dept,Com,Address,Zip code,Corner,Segment,Address1,Address2,Vehicle 1 description,Vehicle 1,Vehicle 2 Description,Vehicle 2,Vehicle 3 Description,Vehicle 3,Person 1 Category,Person 1 Status,Person Tag,Person 2 Category,Person 2 Status,Person 2 Tag,Person 3 Cat,Person 3 Status,Person 3 Tag,Person 4 Cat,Person 4 Status,Person 4 Tag,fullAddress,latitude,longitude,season,involvedCount,periodOfDay
16/10/2012 10:15,16/10/2012,10:15,75,101,"1, PLACE DU CHATELET",,1_75101_6801,,"1, PLACE DU CHATELET, 75001 Paris",AVENUE VICTORIA,Motorbike,RESPONSIBLE,,,,,Cond,Injured,RESPONSIBLE,,,,,,,,,,"1, PLACE DU CHATELET, 75001 Paris",48.8575996,2.3467545,autumn,1,morning
15/03/2012 17:10,15/03/2012,17:10,75,101,"41, RUE DE RIVOLI",,1_75101_6850,,"41, RUE DE RIVOLI, 75001 Paris",BOULEVARD DE SEBASTOPOL,Bicycle,RESPONSIBLE,,,,,Cond,Injured,RESPONSIBLE,Troll,Safe,RESPONSIBLE,,,,,,,"41, RUE DE RIVOLI, 75001 Paris",48.85842,2.3477402,winter,1,afternoon
19/04/2013 11:10,19/04/2013,11:10,75,101,"41, RUE DE RIVOLI",,1_75101_6850,,"41, RUE DE RIVOLI, 75001 Paris",BOULEVARD DE SEBASTOPOL,Van,RESPONSIBLE,,,,,Cond,Safe,RESPONSIBLE,Pedestrian,Injured,RESPONSIBLE,,,,,,,"41, RUE DE RIVOLI, 75001 Paris",48.85842,2.3477402,spring,1,morning
12/01/2012 22:30,12/01/2012,22:30,75,101,"19, RUE ETIENNE MARCEL",,1_75101_6945,,"19, RUE ETIENNE MARCEL, 75001 Paris",RUE DE TURBIGO,Motor Scooter,RESPONSIBLE,Motor Scooter,VICTIM,,,Cond,Safe,RESPONSIBLE,Cond,Safe,VICTIM,Pedestrian,Injured,RESPONSIBLE,,,,"19, RUE ETIENNE MARCEL, 75001 Paris",48.8638538,2.3483359,winter,2,evening
fluentd config
csv 텍스트 파일을 읽어들인 뒤, csv paser 를 이용해 최초 파싱을 수행한다. 이후 longitude,latitude 컬럼을 location 이라는 필드로 변형한 뒤 elasticsearch 에 저장한다.
# CSV, accidentology
## Input
<source>
@type tail
path /home/fluentd/accident*
read_from_head true
pos_file /data/fluentd/pos/accident.pos
tag accident
<parse>
@type csv
keys timestamp,Date,Hour,Dept,Com,Address,Zip code,Corner,Segment,Address1,Address2,Vehicle 1 description,Vehicle 1,Vehicle 2 Description,Vehicle 2,Vehicle 3 Description,Vehicle 3,Person 1 Category,Person 1 Status,Person Tag,Person 2 Category,Person 2 Status,Person 2 Tag,Person 3 Cat,Person 3 Status,Person 3 Tag,Person 4 Cat,Person 4 Status,Person 4 Tag,fullAddress,latitude,longitude,season,involvedCount,periodOfDay
types latitude:float,longitude:float
time_key timestamp
time_format %d/%m/%Y %H:%M
</parse>
</source>
## Filter
<filter accident*.**>
@type record_transformer
enable_ruby
<record>
location ${record["latitude"]},${record["longitude"]}
</record>
remove_keys longitude,latitude
</filter>
## Output
<match accident*.**>
@type copy
<store>
@type elasticsearch
hosts 192.168.179.81:9200,192.168.179.82:9200,
type_name accident
logstash_format true
logstash_prefix accidentology
logstash_dateformat %Y
include_tag_key true
tag_key @log_name
flush_interval 10s
</store>
<store>
@type stdout
</store>
</match>