Fluentd Use Case
http 플러그인 활용하기
지정한 Listener Port 를 통해 POST 되는 데이터를 수집한다.
openAPI
openAPI 를 이용해 REST 하게 받아온 데이터를 다시 Fluentd http listener 에 전달해줌으로써 데이터를 수집한다.
데이터 소스
vi /app/python/sources/JayWalkingAll.py
JayWalkingAll
#!/app/python/bin/python
from urllib.request import urlopen
from urllib.parse import urlencode,quote_plus,unquote
import urllib
import requests
import json
url = 'http://apis.data.go.kr/B552061/jaywalking/getRestJaywalking'
my_service_key = 'some_key_value_here'
ycd = [2015052]
scd = [50]
#ycd = [2017081
# ,2016052
# ,2015052
# ,2014117
# ,2013114]
#scd = [11
# ,26
# ,27
# ,28
# ,29
# ,30
# ,31
# ,36
# ,41
# ,42
# ,43
# ,44
# ,45
# ,46
# ,47
# ,48
# ,50]
for i in range (len(ycd)):
for j in range (len(scd)):
#print (ycd[i],scd[j])
queryParams = '?' + urlencode({ quote_plus('servicekey') : my_service_key,
quote_plus('searchYearCd') : ycd[i],
quote_plus('siDo') : scd[j]
})
request = urllib.request.Request(url+unquote(queryParams))
#print ('Your Request:\n'+url+unquote(queryParams))
request.get_method = lambda: 'GET'
response_body = urlopen(request).read()
print (response_body.decode('utf-8'))
#print (type(response_body))
#print (type(str(response_body)))
#print ((str(response_body.decode('utf-8'))))
headers = {'Content-type': 'application/json'}
#req = requests.request('PUT','http://localhost:47474/JayWalking',json=json.dumps(str(response_body.decode('utf-8'))))
import ast
json_response_body = ast.literal_eval(str(response_body.decode('utf-8')))
for i in range(len(json_response_body['searchResult']['frequentzone'])):
data_all = json_response_body['searchResult']['frequentzone'][i]
coordinates = list(ast.literal_eval(data_all['geometry'])['coordinates'][0])
for j in range(len(coordinates)):
data = data_all
latitude,longitude = (str(coordinates[j]).replace('[','').replace(']','').replace(' ','')).split(',')
print (latitude)
print (longitude)
#data.update({'location':str(coordinates[j]).replace('[','').replace(']','').replace(' ','')})
data.update({'latitude':latitude})
data.update({'longitude':longitude})
data.update({'location_id':j})
print(data)
req = requests.request('PUT','http://localhost:47474/JayWalking',json=data)
#data = {'auth_token': 'auth1', 'widget': 'id1', 'title': 'Something1', 'text': 'Some text', 'moreinfo': 'Subtitle'}
print(req.json)
fluentd config
# Http Input & Stdout Output
## Input
<source>
@type http
port 47474
<parse>
@type json
types latitude:float,longitude:float
</parse>
</source>
## Filter
<filter **JayWalking*.**>
@type record_transformer
enable_ruby
<record>
location ${record["longitude"]},${record["latitude"]}
</record>
remove_keys longitude,latitude
</filter>
## Output
<match **JayWalking**>
@type copy
<store>
@type elasticsearch
hosts 192.168.179.81:9200,192.168.179.82:9200,
logstash_format true
logstash_prefix jaywalking
logstash_dateformat %Y%m
include_tag_key true
tag_key @log_name
flush_interval 10s
</store>
<store>
@type stdout
</store>
</match>