[ElasticSearch] 검색엔진 만들기 2 – IMS 데이터 크롤링
Selenium
, BeautifulSoup
를 이용해서 IMS 이슈 페이지 Crawler 를 만든다. 기초적인 부분은 [여기]()를 참고.
단위 크롤러
수집할 이슈 번호에 대한 리스트를 만들어 한번에 수집하고 Elasticsearch에 인덱싱 하는 IndexIssueList
함수를 만들었다. (IMS 는 이슈 번호를 URL(https://ims.tmaxsoft.com/tody/ims/issue/issueView.do?issueId=
) 에 변수로 대입해서 각 이슈 페이지에 접근할 수 있다.)
_LOGIN_ID = "IMS_USERNAME"
_LOGIN_PASS = "IMS_PASSWORD"
_URL_LOGIN = "https://ims.tmaxsoft.com/tody/auth/login.do"
_URL_ISSVIEW = "https://ims.tmaxsoft.com/tody/ims/issue/issueView.do?issueId="
_SLCT_LOGIN_INPUT = "body > form > table > tbody > tr > td > table > tbody > tr:nth-child(2) > td:nth-child(1) > table > tbody > tr > td:nth-child(2) > table > tbody > tr > td:nth-child(2) > table > tbody > tr > td:nth-child(3) > input"
_SLCT_SUBJ = "body > div:nth-of-type(2) > table > tbody > tr > td:nth-of-type(2) > table > tbody > tr:nth-of-type(2) > td > table:nth-of-type(1) > tbody > tr > td > table > tbody > tr > td:nth-of-type(2)"
_SLCT_DTLS = "#IssueDescriptionDiv > table > tbody > tr > td"
_SLCT_IINFO = "#issueInfoTable"
def IndexIssueList(issueList):
import re
from selenium import webdriver
from bs4 import BeautifulSoup
driver = webdriver.PhantomJS('./webdriver/phantomjs',
service_log_path='./logs/ghostdriver.log')
driver.implicitly_wait(1)
driver.get(_URL_LOGIN)
driver.find_element_by_name('id').send_keys(_LOGIN_ID)
driver.find_element_by_name('password').send_keys(_LOGIN_PASS)
driver.find_element_by_css_selector(_SLCT_LOGIN_INPUT).click()
iss = []
for il in issueList:
try:
data = {}
driver.get(_URL_ISSVIEW+str(il))
html = driver.page_source
soup = BeautifulSoup(html,'html.parser')
subj = soup.select(_SLCT_SUBJ)[0].text.strip()
data.update({'Subject':subj})
iinfo = soup.select(_SLCT_IINFO)[0].findAll('tr')
for b in iinfo:
key,val = '',''
for i in b.findAll('td',attrs={'class':re.compile(r'title|data')}):
if i['class'][0] == 'title': key = i.text.strip()
else: val = i.text.strip()
data.update({key:re.sub('[\n\t ]+',' ',val)})
if data['Closed Date'] == '':
data['Closed Date']='1970/01/01 09:00:00'
if data['Date of final order'] == '':
data['Date of final order']='1970/01/01 09:00:00'
dtls = soup.select(_SLCT_DTLS)
data.update({'Issue Details':re.sub('[\n\t ]+',' ',dtls[0].text.strip())})
comments,tmp = soup.findAll('div',attrs={'class':'commDescTR data'}),""
for c in comments: tmp += c.text.strip()+'\n'
data.update({'Action Log':tmp})
iss.append(data)
except IndexError as error:
continue
driver.close()
for i in range(len(iss)):
iss[i].update({'Issue Number':re.search(r'\d+',iss[i]['Issue Number']).group()})
actions = [
{
"_index": "issue-v0.1.4",
"_type": "_doc",
"_id": iss[i]['Issue Number'],
"_source": iss[i]
}
for i in range(len(iss))
]
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_client = Elasticsearch(["localhost:9200"],timeout=300)
res = helpers.bulk(es_client, actions)
위 코드에서 driver.close()
부분까지가 IMS 웹에서 데이터를 수집하는 부분이고, 그 아래가 elasticsearch 에 bulk indexing 으로 데이터를 색인하는 단계이다. 수집 단계에서는 수집 대상 웹 페이지에 따라 다르게 구현될 것이기때문에 그때그때 달라질 부분이지만 일단 한 페이지의 정보를 딕셔너리 형태로 만들고 이 딕셔너리들에 대한 리스트를 만들어 뒤에서 순회할 수 있도록 한다.
다음과 같이 벌크 작업을 할 것들을 하나의 리스트 actions 로 만들어주면되는데 _id
는 Issue Number 와 같게 만들어서 추후 업데이트 작업이 일어날 때 번거롭게 id 를 찾아서 업데이트하지 않아도 되도록 한다.
actions = [
{
"_index": "issue-v0.1.4",
"_type": "_doc",
"_id": iss[i]['Issue Number'],
"_source": iss[i]
}
for i in range(len(iss))
]
초기 적재 및 전체 업데이트
기존에 존재 하는 _id
와 같은 값으로 문서가 색인되면 update 처리가 되기 때문에 초기 적재는 별도로 만들 필요는 없이 updateAll 하나의 함수로 만들었다. MaxFromIMS
함수에서 현재 가장 최근에 등록된 IMS 번호를 알아내고 이를 1번 이슈부터 _BATCH_SIZE
단위로 끊어 네 개의 쓰레드에 작업을 분배시켜준다. 각 쓰레드는 앞서 만든 IndexIssueList 작업을 수행하며 _BATCH_SIZE 크기 만큼의 이슈 갯수를 색인하고 종료된다.
_URL_ISSLIST = "https://ims.tmaxsoft.com/tody/ims/issue/issueList.do"
_SLCT_ISSLIST = "#IssueListForm"
_BATCH_SIZE = 50
_KEEP_ALIVE_LIMIT = '30s'
def MaxFromIMS():
from selenium import webdriver
from bs4 import BeautifulSoup
driver = webdriver.PhantomJS('./webdriver/phantomjs',
service_log_path='./logs/ghostdriver.log')
driver.implicitly_wait(1)
driver.get(_URL_LOGIN)
driver.find_element_by_name('id').send_keys(_LOGIN_ID)
driver.find_element_by_name('password').send_keys(_LOGIN_PASS)
driver.find_element_by_css_selector(_SLCT_LOGIN_INPUT).click()
driver.get(_URL_ISSLIST)
html = driver.page_source
soup = BeautifulSoup(html,'html.parser')
max_iss = soup.select(_SLCT_ISSLIST)[0].findAll('table',attrs={'bgcolor':'#FFFFFF'})[0].findAll('tr')[1].findAll('td')[0].text.strip()
driver.close()
return int(max_iss)
def UpdateAll():
import threading
from time import sleep
start = 1
end = MaxFromIMS()
jobs = list(range(start,end+1))
while jobs:
buf = []
if threading.active_count() < 5:
while len(buf) < _BATCH_SIZE:
if len(jobs) == 0: break
buf.append(jobs.pop(0))
threading.Thread(target=IndexIssueList,args=(buf,)).start()
sleep(1)
미종료 이슈에 대한 재수집
IMS 시스템은 초단위의 갱신 작업이 이뤄지지는 않고 그정도로 검색엔진에서 반영해줄 필요는 없지만, 일 단위로 보면 제법 많은 갱신이 이뤄지고 있다. 시시각각 변하는 데이터를 검색엔진에 최신화 하여 반영하기 위해 해당 시스템의 DB에 직접 접근할 수 있다면 가장 효율적이겠지만 그럴 권한은 없기때문에 하루 한번 정도 갱신될 가능성이 있는 문서들을 다시 재수집해주는 것으로.. 이 재수집 대상을 전체로 하기에는 부담스럽기 때문에 최소화하기 위해 종료되지 않은 이슈를 Scroll API 이용해서 파악한다.
def GetIssNumberNotClosed():
body = {
"_source":["Issue Number","Status"],
"query" : {
"bool":{
"must_not":[
{
"term":{
"Status.keyword":"Closed"
}
},
{
"term":{
"Status.keyword":"Closed_P"
}
},
{
"term":{
"Status.keyword":"Rejected"
}
},
{
"term":{
"Status.keyword":"Rejected (없음)"
}
},
{
"term":{
"Status.keyword":"Prevented"
}
},
{
"term":{
"Status.keyword":"Resolved"
}
}
]
}
},
"size":10
}
from elasticsearch import Elasticsearch
es_client = Elasticsearch(["localhost:9200"],timeout=300)
response = es_client.search(
index = 'issue-v0.1.4',
scroll = _KEEP_ALIVE_LIMIT,
size = 100,
body = body
)
sid = response['_scroll_id']
fetched = len(response['hits']['hits'])
fcnt = fetched
nums = []
for i in range(fetched):
nums.append(int(response['hits']['hits'][i]['_source']['Issue Number']))
while(fetched>0):
response = es_client.scroll(scroll_id=sid, scroll=_KEEP_ALIVE_LIMIT)
sid = response['_scroll_id']
fetched = len(response['hits']['hits'])
for i in range(fetched):
nums.append(int(response['hits']['hits'][i]['_source']['Issue Number']))
return nums
def UpdateNotClosed():
import threading
jobs = GetIssNumberNotClosed()
while jobs:
buf = []
if threading.active_count() < 5:
while len(buf) < 50:
if len(jobs) == 0: break
buf.append(jobs.pop(0))
threading.Thread(target=IndexIssueList,args=(buf,)).start()
새로운 이슈 수집
새로운 이슈는 수집된 이슈 번호의 최대 값에서부터 IMS에 등록된 이슈번호의 최대값까지인데 MaxFromES 에서 수집한 최대 번호를 받아내고 여기서부터 MaxFromIMS 까지 수집하는 Renew 함수를 생각해볼 수 있다. 이는 시간당 몇 개씩만 등록되어서 짧은 주기로 수집해도 될 것 같다.
def MaxFromES():
from elasticsearch import Elasticsearch
es = Elasticsearch("localhost:9200",timeout=10)
body = {
"size":0,
"aggs" : {
"max_iss" : { "max" : { "field" : "Issue Number" } }
}
}
res = es.search(index="issue-v0.1.4", body=body)
return int(res['aggregations']['max_iss']['value'])
def Renew():
from_number = MaxFromES()+1
to_number = MaxFromIMS()+1
IndexIssueList(range(from_number,to_number))
실행파일로 만들기 위해 메인 함수에서는 아래와 같아 커맨드라인 입력을 받아서 돌려볼 수 있다.
#!/Library/Frameworks/Python.framework/Versions/3.7/bin/python3
if __name__ == '__main__':
import sys
mode = sys.argv[1]
if mode == 'renew': Renew()
elif mode == 'updateNotClosed': UpdateNotClosed()
elif mode == 'updateAll': UpdateAll()
elif mode == 'single': IndexIssueList([int(sys.argv[2])])
elif mode == 'daemon':
from datetime import datetime
from time import sleep
while True:
sleep(60)
now = datetime.now()
if now.hour % 24 == 17 and now.minute % 60 == 30: updateNotClosed()
if now.minute % 10 == 0: renew()
전체 소스코드
위에 부분들 합쳐놓은 것에, IndexIssueList 함수에 print('try to crawl issue', il)
부분만 추가했다. 초기 적재시에는 ./crawl.py updateAll
과 같이 실행해주면 되고 나머지도 비슷하게 사용하면 된다.
#!/Library/Frameworks/Python.framework/Versions/3.7/bin/python3
_LOGIN_ID = "id"
_LOGIN_PASS = "pw"
_URL_LOGIN = "https://ims.tmaxsoft.com/tody/auth/login.do"
_URL_ISSVIEW = "https://ims.tmaxsoft.com/tody/ims/issue/issueView.do?issueId="
_SLCT_LOGIN_INPUT = "body > form > table > tbody > tr > td > table > tbody > tr:nth-child(2) > td:nth-child(1) > table > tbody > tr > td:nth-child(2) > table > tbody > tr > td:nth-child(2) > table > tbody > tr > td:nth-child(3) > input"
_SLCT_SUBJ = "body > div:nth-of-type(2) > table > tbody > tr > td:nth-of-type(2) > table > tbody > tr:nth-of-type(2) > td > table:nth-of-type(1) > tbody > tr > td > table > tbody > tr > td:nth-of-type(2)"
_SLCT_DTLS = "#IssueDescriptionDiv > table > tbody > tr > td"
_SLCT_IINFO = "#issueInfoTable"
def IndexIssueList(issueList):
import re
from selenium import webdriver
from bs4 import BeautifulSoup
driver = webdriver.PhantomJS('./webdriver/phantomjs',
service_log_path='./logs/ghostdriver.log')
driver.implicitly_wait(1)
driver.get(_URL_LOGIN)
driver.find_element_by_name('id').send_keys(_LOGIN_ID)
driver.find_element_by_name('password').send_keys(_LOGIN_PASS)
driver.find_element_by_css_selector(_SLCT_LOGIN_INPUT).click()
iss = []
for il in issueList:
print('try to crawl issue', il)
try:
data = {}
driver.get(_URL_ISSVIEW+str(il))
html = driver.page_source
soup = BeautifulSoup(html,'html.parser')
subj = soup.select(_SLCT_SUBJ)[0].text.strip()
data.update({'Subject':subj})
iinfo = soup.select(_SLCT_IINFO)[0].findAll('tr')
for b in iinfo:
key,val = '',''
for i in b.findAll('td',attrs={'class':re.compile(r'title|data')}):
if i['class'][0] == 'title': key = i.text.strip()
else: val = i.text.strip()
data.update({key:re.sub('[\n\t ]+',' ',val)})
if data['Closed Date'] == '':
data['Closed Date']='1970/01/01 09:00:00'
if data['Date of final order'] == '':
data['Date of final order']='1970/01/01 09:00:00'
dtls = soup.select(_SLCT_DTLS)
data.update({'Issue Details':re.sub('[\n\t ]+',' ',dtls[0].text.strip())})
comments,tmp = soup.findAll('div',attrs={'class':'commDescTR data'}),""
for c in comments: tmp += c.text.strip()+'\n'
data.update({'Action Log':tmp})
iss.append(data)
except IndexError as error:
logger.info(error)
continue
driver.close()
for i in range(len(iss)):
iss[i].update({'Issue Number':re.search(r'\d+',iss[i]['Issue Number']).group()})
actions = [
{
"_index": "issue-v0.1.4",
"_type": "_doc",
"_id": iss[i]['Issue Number'],
"_source": iss[i]
}
for i in range(len(iss))
]
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_client = Elasticsearch(["localhost:9200"],timeout=300)
res = helpers.bulk(es_client, actions)
_URL_ISSLIST = "https://ims.tmaxsoft.com/tody/ims/issue/issueList.do"
_SLCT_ISSLIST = "#IssueListForm"
_BATCH_SIZE = 50
_KEEP_ALIVE_LIMIT = '30s'
def MaxFromIMS():
from selenium import webdriver
from bs4 import BeautifulSoup
driver = webdriver.PhantomJS('./webdriver/phantomjs',
service_log_path='./logs/ghostdriver.log')
driver.implicitly_wait(1)
driver.get(_URL_LOGIN)
driver.find_element_by_name('id').send_keys(_LOGIN_ID)
driver.find_element_by_name('password').send_keys(_LOGIN_PASS)
driver.find_element_by_css_selector(_SLCT_LOGIN_INPUT).click()
driver.get(_URL_ISSLIST)
html = driver.page_source
soup = BeautifulSoup(html,'html.parser')
max_iss = soup.select(_SLCT_ISSLIST)[0].findAll('table',attrs={'bgcolor':'#FFFFFF'})[0].findAll('tr')[1].findAll('td')[0].text.strip()
driver.close()
return int(max_iss)
def UpdateAll():
import threading
from time import sleep
start = 1
end = MaxFromIMS()
jobs = list(range(start,end+1))
while jobs:
buf = []
if threading.active_count() < 5:
while len(buf) < _BATCH_SIZE:
if len(jobs) == 0: break
buf.append(jobs.pop(0))
threading.Thread(target=IndexIssueList,args=(buf,)).start()
sleep(1)
def GetIssNumberNotClosed():
body = {
"_source":["Issue Number","Status"],
"query" : {
"bool":{
"must_not":[
{
"term":{
"Status.keyword":"Closed"
}
},
{
"term":{
"Status.keyword":"Closed_P"
}
},
{
"term":{
"Status.keyword":"Rejected"
}
},
{
"term":{
"Status.keyword":"Rejected (없음)"
}
},
{
"term":{
"Status.keyword":"Prevented"
}
},
{
"term":{
"Status.keyword":"Resolved"
}
}
]
}
},
"size":10
}
from elasticsearch import Elasticsearch
es_client = Elasticsearch(["localhost:9200"],timeout=300)
response = es_client.search(
index = 'issue-v0.1.4',
scroll = _KEEP_ALIVE_LIMIT,
size = 100,
body = body
)
sid = response['_scroll_id']
fetched = len(response['hits']['hits'])
fcnt = fetched
nums = []
for i in range(fetched):
nums.append(int(response['hits']['hits'][i]['_source']['Issue Number']))
while(fetched>0):
response = es_client.scroll(scroll_id=sid, scroll=_KEEP_ALIVE_LIMIT)
sid = response['_scroll_id']
fetched = len(response['hits']['hits'])
for i in range(fetched):
nums.append(int(response['hits']['hits'][i]['_source']['Issue Number']))
return nums
def UpdateNotClosed():
import threading
jobs = GetIssNumberNotClosed()
while jobs:
buf = []
if threading.active_count() < 5:
while len(buf) < 50:
if len(jobs) == 0: break
buf.append(jobs.pop(0))
threading.Thread(target=IndexIssueList,args=(buf,)).start()
def MaxFromES():
from elasticsearch import Elasticsearch
es = Elasticsearch("localhost:9200",timeout=10)
body = {
"size":0,
"aggs" : {
"max_iss" : { "max" : { "field" : "Issue Number" } }
}
}
res = es.search(index="issue-v0.1.4", body=body)
return int(res['aggregations']['max_iss']['value'])
def Renew():
from_number = MaxFromES()+1
to_number = MaxFromIMS()+1
IndexIssueList(range(from_number,to_number))
if __name__ == '__main__':
import sys
mode = sys.argv[1]
if mode == 'renew': Renew()
elif mode == 'updateNotClosed': UpdateNotClosed()
elif mode == 'updateAll': UpdateAll()
elif mode == 'single': IndexIssueList([int(sys.argv[2])])
elif mode == 'daemon':
from datetime import datetime
from time import sleep
while True:
sleep(60)
now = datetime.now()
if now.hour % 24 == 17 and now.minute % 60 == 30: updateNotClosed()
if now.minute % 10 == 0: renew()