[PySpark] dataframe을 python 자료형(dict)으로 변형하기
spark driver 에서 데이터를 바로 사용하는 경우, dataframe은 다루기가 어려운 것 같다. dataframe의 collect()
와 asDict()
를 이용하면 Python 자료형으로 변환할 수 있다.
dataframe 생성
pyspark
에서 elasticsearch index를 조회해 dataframe을 생성한다.
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes","192.168.179.141:9200").option("es.nodes.discovery", "true").load("${INDEX}/${TYPE}")
>>> df.registerTempTable("tab")
>>> df = sqlContext.sql("SELECT distinct request FROM tab")
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
type이 위와 같이 <class 'pyspark.sql.dataframe.DataFrame'>
으로 조회되고 spark driver에서 변형하기가 어렵다.
collect()
다음과 같이 collect()
메소드를 사용하면, 원소가 <class 'pyspark.sql.types.Row'>
인 list
자료형으로 바뀐다.
>>> collected = df.collect()
>>> type(collected)
<class 'list'>
>>> type(collected[0])
<class 'pyspark.sql.types.Row'>
>>> type(collected[0])
<class 'pyspark.sql.types.Row'>
>>> collected
[Row(request='/issue/findRelationIssues.do'), Row(request='/product/findProductPrefixList.do'), Row(request='/tagfree/xfreeEditor/dialog/splitcell.js'), Row(request='/issue/saveCoreSetComment.do'), Row(request='/issue/js/common/jquery-1.9.1.min.js'), Row(request='/js/floatMenu.js'), Row(request='/main.do/favicon.ico'), Row(request='/tagfree/xfreeEditor/js/xfe_mobile_info.js'), Row(request='/tagfree/xfreeEditor/dialog/insertlayout.js'), Row(request='/dwr/exec/ProductDwr.findSearchSubVersions'), Row(request='/js/version.js'), Row(request='/patch/patchVeriForm.do'), Row(request='/comment/removeComment.do'), Row(request='/knowledge/knowledgeList.do'), Row(request='/product/productList.do'), Row(request='/dwr/interface/IssueDwr.js'), Row(request='/dwr/exec/UserProfileDwr.saveUserIssueColumns'), Row(request='/patch/patchSearchList.do'), Row(request='/dwr/engine.js'), Row(request='/sso/ssoLogout.jsp'), Row(request='/checkIssuesAjax.do'), Row(request='/manager/managerList.do'), Row(request='/sso/js//.js'), Row(request='/tagfree/xfreeEditor/dialog/shortcut.js'), Row(request='/issue/popupUrgent.do'), Row(request='/innorix/common/upload.jsp'), Row(request='/board/readBoard.do'), Row(request='/html/eng_manual/manualForm.jsp'), Row(request='/tagfree/xfreeEditor/dialog/medialink.js'), Row(request='/dwr/interface/RequirementDwr.js'), Row(request='/tagfree/xfreeEditor/js/library/rangy/rangy-highlighter.js'), Row(request='/js/commentForm.js'), Row(request='/tagfree/xfreeEditor/js/xfe_env_contents.js'), Row(request='/dwr/exec/UserDwr.findUsersByName'), Row(request='/dwr/interface/UserDwr.js'), Row(request='/tagfree/xfreeEditor/dialog/paste.js'), Row(request='/tagfree/xfreeEditor/dialog/cellproperty.js'), Row(request='/tagfree/xfreeEditor/js/xfe_create_dialog.js'), Row(request='/tagfree/xfreeEditor/js/xfe_bottom_event.js'), Row(request='/util/saveFilteredIssueList.do'), Row(request='/auth/login.do'), Row(request='/html/OpenFrame/Common/html/'), Row(request='/sso/ssologin.jsp'), Row(request='/tagfree/xfreeEditor/dialog/colorpicker.js'), Row(request='/dwr/exec/UserDwr.findUsersByNameAll'), Row(request='/board/boardList.do'), Row(request='/module/moduleList.do'), Row(request='/dwr/exec/ProductDwr.findSearchMainVersions'), Row(request='/issue/issueSearchList.do'), Row(request='/tagfree/xfreeEditor/js/xfe_range_handler.js'), Row(request='/issue/js/common/global.js')]
asDict()
<class 'pyspark.sql.types.Row'>
타입에 asDict()
메소드를 사용하면 dictionary 자료형으로 변형할 수 있다.
>>> collected_first = collected[0].asDict()
>>> type(collected_first)
<class 'dict'>
>>> collected_first
{'request': '/issue/findRelationIssues.do'}
>>>