[PySpark] Elasticsearch Index 에서 DataFrame 생성하기
elasticsearch-hadoop connector 를 이용해서 spark dataframe을 생성한다.
Spark 설치
spark 다운로드 및 환경설정
필요한 요소만 설치하고자 하는 경우 소스코드를 내려받아서 maven을 이용해 빌드해도 되지만 pre-compiled 바이너리를 이용한다.
wget http://mirror.navercorp.com/apache/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
tar -xvzf spark-2.3.2-bin-hadoop2.7.tgz -C /app/spark/.
mv /app/spark/spark-2.3.2-bin-hadoop2.7 /app/spark/2.3.2
find /app/spark/2.3.2 -mindepth 1 -maxdepth 1 -type d | xargs -I {} ln -s {} /app/spark/.
export PATH=/app/spark/bin:$PATH
elasticsearch-hadoop 다운로드 및 경로 설정
사용할 elasticsearch 버전에 맞는 라이브러리를 다운로드 한다.
wget https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-6.4.1.zip
unzip elasticsearch-hadoop-6.4.1.zip
mkdir -p /app/spark/jars/ext/elasticsearch-hadoop
mv elasticsearch-hadoop-6.4.1 /app/spark/jars/ext/elasticsearch-hadoop/6.4.1
pyspark 실행
다음 명령을 이용해 대화형 인터프리터 pyspark
를 실행한다.
pyspark --driver-class-path=/app/spark/jars/ext/elasticsearch-hadoop/6.4.1/dist/elasticsearch-hadoop-6.4.1.jar
아래와 같이 출력되면 정상.
Python 3.6.8 (default, Jan 30 2019, 15:14:00)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Type "help", "copyright", "credits" or "license" for more information.
2019-02-07 16:25:33 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
Using Python version 3.6.8 (default, Jan 30 2019 15:14:00)
SparkSession available as 'spark'.
>>>
아래 코드를 한 줄 씩 붙여 넣으면 elasticsearch index에서 SQL을 이용해 데이터를 조회하고 요약 정보를 확인할 수 있다.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes","192.168.179.141:9200").option("es.nodes.discovery", "true").load("${INDEX}/${TYPE}")
df.registerTempTable("tab")
output = sqlContext.sql("SELECT distinct request FROM tab")
output.show()
spark-submit 을 이용한 job 실행
pyspark
를 이용해 job을 실행하는 경우 기본적으로 spark context
가 생성되지만 spark-submit
을 이용하는 경우 spark context
를 별도로 생성해주어야 한다.
vi pyspark-submit_01.py
파일을 열어, distinct request를 조회하는 job을 다음과 같이 작성한다.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
def main():
conf = SparkConf().setAppName("pyspark-submit_01")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
sqlContext = SQLContext(sc)
df = sqlContext.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes","192.168.179.141:9200") \
.option("es.nodes.discovery", "true") \
.load("${INDEX}/${TYPE}")
df.registerTempTable("tab")
rdf = sqlContext.sql("SELECT distinct request FROM tab").collect()
output = sqlContext.sql("SELECT distinct request FROM tab")
output.show()
output.write.format("com.databricks.spark.csv").save("file:////home/spark/output.csv")
if __name__ == '__main__': main()
아래 명령으로 앞서 만든 job 을 실행한다.
spark-submit --jars /app/spark/jars/ext/elasticsearch-hadoop/6.4.1/dist/elasticsearch-hadoop-6.4.1.jar pyspark-submit_01.py
수행 결과
2019-02-07 16:45:20 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-02-07 16:45:20 INFO SparkContext:54 - Running Spark version 2.3.2
2019-02-07 16:45:20 INFO SparkContext:54 - Submitted application: pyspark-submit_01
2019-02-07 16:45:20 INFO SecurityManager:54 - Changing view acls to: spark
2019-02-07 16:45:20 INFO SecurityManager:54 - Changing modify acls to: spark
2019-02-07 16:45:20 INFO SecurityManager:54 - Changing view acls groups to:
2019-02-07 16:45:20 INFO SecurityManager:54 - Changing modify acls groups to:
2019-02-07 16:45:20 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); groups with view permissions: Set(); users with modify permissions: Set(spark); groups with modify permissions: Set()
2019-02-07 16:45:20 INFO Utils:54 - Successfully started service 'sparkDriver' on port 42167.
2019-02-07 16:45:20 INFO SparkEnv:54 - Registering MapOutputTracker
2019-02-07 16:45:20 INFO SparkEnv:54 - Registering BlockManagerMaster
2019-02-07 16:45:20 INFO BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2019-02-07 16:45:20 INFO BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2019-02-07 16:45:20 INFO DiskBlockManager:54 - Created local directory at /tmp/blockmgr-7e80acc2-be90-4ac1-b9ed-0971281e314a
2019-02-07 16:45:20 INFO MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2019-02-07 16:45:21 INFO SparkEnv:54 - Registering OutputCommitCoordinator
2019-02-07 16:45:21 INFO log:192 - Logging initialized @1898ms
2019-02-07 16:45:21 INFO Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2019-02-07 16:45:21 INFO Server:419 - Started @1969ms
2019-02-07 16:45:21 INFO AbstractConnector:278 - Started ServerConnector@29df9709{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-02-07 16:45:21 INFO Utils:54 - Successfully started service 'SparkUI' on port 4040.
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@539d4620{/jobs,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6329f9ce{/jobs/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7be94093{/jobs/job,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1286a091{/jobs/job/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3e4dcc86{/stages,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@f91fe04{/stages/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@8ab406a{/stages/stage,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7f969b69{/stages/stage/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2dec2ce1{/stages/pool,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@75e42251{/stages/pool/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@330f5263{/storage,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7889cab8{/storage/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@34ce7a6c{/storage/rdd,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@606c200e{/storage/rdd/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@29b9fa2a{/environment,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1fd329af{/environment/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@58893d23{/executors,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@59e41a6c{/executors/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3f6ac3d0{/executors/threadDump,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@39f20bc3{/executors/threadDump/json,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@280349fb{/static,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@14c79f00{/,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@237e2725{/api,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2de21930{/jobs/job/kill,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1ac50048{/stages/stage/kill,null,AVAILABLE,@Spark}
2019-02-07 16:45:21 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://node2.qrm:4040
2019-02-07 16:45:21 INFO SparkContext:54 - Added JAR file:///app/spark/jars/ext/elasticsearch-hadoop/6.4.1/dist/elasticsearch-hadoop-6.4.1.jar at spark://node2.qrm:42167/jars/elasticsearch-hadoop-6.4.1.jar with timestamp 1549525521261
2019-02-07 16:45:21 INFO SparkContext:54 - Added file file:/home/spark/pyspark-submit_01.py at file:/home/spark/pyspark-submit_01.py with timestamp 1549525521280
2019-02-07 16:45:21 INFO Utils:54 - Copying /home/spark/pyspark-submit_01.py to /tmp/spark-5c0e461b-4e65-476c-a523-a25380409238/userFiles-2bc774fb-df8d-47ef-b1d5-644625335f20/pyspark-submit_01.py
2019-02-07 16:45:21 INFO Executor:54 - Starting executor ID driver on host localhost
2019-02-07 16:45:21 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 38925.
2019-02-07 16:45:21 INFO NettyBlockTransferService:54 - Server created on node2.qrm:38925
2019-02-07 16:45:21 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2019-02-07 16:45:21 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, node2.qrm, 38925, None)
2019-02-07 16:45:21 INFO BlockManagerMasterEndpoint:54 - Registering block manager node2.qrm:38925 with 366.3 MB RAM, BlockManagerId(driver, node2.qrm, 38925, None)
2019-02-07 16:45:21 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, node2.qrm, 38925, None)
2019-02-07 16:45:21 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, node2.qrm, 38925, None)
2019-02-07 16:45:21 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7d3f5e97{/metrics/json,null,AVAILABLE,@Spark}
+------------+
| request|
+------------+
|...ssue/f...|
|...roduct...|
|...ee/xfr...|
|...ssue/s...|
|...ssue/j...|
|...dy/flo...|
|...ain.do...|
|...ee/xfr...|
|...ee/xfr...|
|...s/vers...|
|...xec/Pr...|
|...atch/p...|
|...omment...|
|...roduct...|
|...nowled...|
|...nterfa...|
|...xec/Us...|
|...atch/p...|
|...engine.js|
|...soLogo...|
+------------+
only showing top 20 rows