{"id":846,"date":"2019-02-07T16:53:56","date_gmt":"2019-02-07T07:53:56","guid":{"rendered":"https:\/\/dong1lkim.oboki.net\/?p=846"},"modified":"2019-09-01T22:23:32","modified_gmt":"2019-09-01T13:23:32","slug":"pyspark-elasticsearch-index-%ec%97%90%ec%84%9c-dataframe-%ec%83%9d%ec%84%b1%ed%95%98%ea%b8%b0","status":"publish","type":"post","link":"https:\/\/oboki.net\/workspace\/python\/pyspark-elasticsearch-index-%ec%97%90%ec%84%9c-dataframe-%ec%83%9d%ec%84%b1%ed%95%98%ea%b8%b0\/","title":{"rendered":"[PySpark] Elasticsearch Index \uc5d0\uc11c DataFrame \uc0dd\uc131\ud558\uae30"},"content":{"rendered":"<h1>[PySpark] Elasticsearch Index \uc5d0\uc11c DataFrame \uc0dd\uc131\ud558\uae30<\/h1>\n<blockquote><p>\n  elasticsearch-hadoop connector \ub97c \uc774\uc6a9\ud574\uc11c spark dataframe\uc744 \uc0dd\uc131\ud55c\ub2e4.\n<\/p><\/blockquote>\n<h2>Spark \uc124\uce58<\/h2>\n<h3>spark \ub2e4\uc6b4\ub85c\ub4dc \ubc0f \ud658\uacbd\uc124\uc815<\/h3>\n<p>\ud544\uc694\ud55c \uc694\uc18c\ub9cc \uc124\uce58\ud558\uace0\uc790 \ud558\ub294 \uacbd\uc6b0 \uc18c\uc2a4\ucf54\ub4dc\ub97c \ub0b4\ub824\ubc1b\uc544\uc11c maven\uc744 \uc774\uc6a9\ud574 \ube4c\ub4dc\ud574\ub3c4 \ub418\uc9c0\ub9cc pre-compiled \ubc14\uc774\ub108\ub9ac\ub97c \uc774\uc6a9\ud55c\ub2e4.<\/p>\n<pre><code class=\"bash\">wget http:\/\/mirror.navercorp.com\/apache\/spark\/spark-2.3.2\/spark-2.3.2-bin-hadoop2.7.tgz\ntar -xvzf spark-2.3.2-bin-hadoop2.7.tgz -C \/app\/spark\/.\nmv \/app\/spark\/spark-2.3.2-bin-hadoop2.7 \/app\/spark\/2.3.2\nfind \/app\/spark\/2.3.2 -mindepth 1 -maxdepth 1 -type d | xargs -I {} ln -s {} \/app\/spark\/.\nexport PATH=\/app\/spark\/bin:$PATH\n<\/code><\/pre>\n<h3>elasticsearch-hadoop \ub2e4\uc6b4\ub85c\ub4dc \ubc0f \uacbd\ub85c \uc124\uc815<\/h3>\n<p>\uc0ac\uc6a9\ud560 elasticsearch \ubc84\uc804\uc5d0 \ub9de\ub294 \ub77c\uc774\ube0c\ub7ec\ub9ac\ub97c \ub2e4\uc6b4\ub85c\ub4dc \ud55c\ub2e4.<\/p>\n<pre><code class=\"bash\">wget https:\/\/artifacts.elastic.co\/downloads\/elasticsearch-hadoop\/elasticsearch-hadoop-6.4.1.zip\nunzip elasticsearch-hadoop-6.4.1.zip\nmkdir -p \/app\/spark\/jars\/ext\/elasticsearch-hadoop\nmv elasticsearch-hadoop-6.4.1 \/app\/spark\/jars\/ext\/elasticsearch-hadoop\/6.4.1\n<\/code><\/pre>\n<h2>pyspark \uc2e4\ud589<\/h2>\n<p>\ub2e4\uc74c \uba85\ub839\uc744 \uc774\uc6a9\ud574 \ub300\ud654\ud615 \uc778\ud130\ud504\ub9ac\ud130 <code>pyspark<\/code>\ub97c \uc2e4\ud589\ud55c\ub2e4.<\/p>\n<p><code>pyspark --driver-class-path=\/app\/spark\/jars\/ext\/elasticsearch-hadoop\/6.4.1\/dist\/elasticsearch-hadoop-6.4.1.jar<\/code><\/p>\n<p>\uc544\ub798\uc640 \uac19\uc774 \ucd9c\ub825\ub418\uba74 \uc815\uc0c1.<\/p>\n<pre><code class=\"txt\">Python 3.6.8 (default, Jan 30 2019, 15:14:00) \n[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux\nType \"help\", \"copyright\", \"credits\" or \"license\" for more information.\n2019-02-07 16:25:33 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\nSetting default log level to \"WARN\".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\nWelcome to\n      ____              __\n     \/ __\/__  ___ _____\/ \/__\n    _\\ \\\/ _ \\\/ _ `\/ __\/  '_\/\n   \/__ \/ .__\/\\_,_\/_\/ \/_\/\\_\\   version 2.3.2\n      \/_\/\n\nUsing Python version 3.6.8 (default, Jan 30 2019 15:14:00)\nSparkSession available as 'spark'.\n>&gt;&gt; \n<\/code><\/pre>\n<p>\uc544\ub798 \ucf54\ub4dc\ub97c \ud55c \uc904 \uc529 \ubd99\uc5ec \ub123\uc73c\uba74 elasticsearch index\uc5d0\uc11c SQL\uc744 \uc774\uc6a9\ud574 \ub370\uc774\ud130\ub97c \uc870\ud68c\ud558\uace0 \uc694\uc57d \uc815\ubcf4\ub97c \ud655\uc778\ud560 \uc218 \uc788\ub2e4.<\/p>\n<pre><code class=\"py\">from pyspark.sql import SQLContext\n\nsqlContext = SQLContext(sc)\n\ndf = sqlContext.read.format(\"org.elasticsearch.spark.sql\").option(\"es.nodes\",\"192.168.179.141:9200\").option(\"es.nodes.discovery\", \"true\").load(\"${INDEX}\/${TYPE}\")\ndf.registerTempTable(\"tab\")\n\noutput = sqlContext.sql(\"SELECT distinct request FROM tab\")\noutput.show()\n<\/code><\/pre>\n<h2>spark-submit \uc744 \uc774\uc6a9\ud55c job \uc2e4\ud589<\/h2>\n<p><code>pyspark<\/code>\ub97c \uc774\uc6a9\ud574 job\uc744 \uc2e4\ud589\ud558\ub294 \uacbd\uc6b0 \uae30\ubcf8\uc801\uc73c\ub85c <code>spark context<\/code>\uac00 \uc0dd\uc131\ub418\uc9c0\ub9cc <code>spark-submit<\/code>\uc744 \uc774\uc6a9\ud558\ub294 \uacbd\uc6b0 <code>spark context<\/code>\ub97c \ubcc4\ub3c4\ub85c \uc0dd\uc131\ud574\uc8fc\uc5b4\uc57c \ud55c\ub2e4.<\/p>\n<p><code>vi pyspark-submit_01.py<\/code> \ud30c\uc77c\uc744 \uc5f4\uc5b4, distinct request\ub97c \uc870\ud68c\ud558\ub294 job\uc744 \ub2e4\uc74c\uacfc \uac19\uc774 \uc791\uc131\ud55c\ub2e4.<\/p>\n<pre><code class=\"py\">from pyspark import SparkContext, SparkConf\nfrom pyspark.sql import SQLContext\n\ndef main():\n    conf = SparkConf().setAppName(\"pyspark-submit_01\")\n    sc = SparkContext(conf=conf)\n    sc.setLogLevel(\"WARN\")\n    sqlContext = SQLContext(sc)\n\n    df = sqlContext.read \\\n        .format(\"org.elasticsearch.spark.sql\") \\\n        .option(\"es.nodes\",\"192.168.179.141:9200\") \\\n        .option(\"es.nodes.discovery\", \"true\") \\\n        .load(\"${INDEX}\/${TYPE}\")\n    df.registerTempTable(\"tab\")\n    rdf = sqlContext.sql(\"SELECT distinct request FROM tab\").collect()\n\n    output = sqlContext.sql(\"SELECT distinct request FROM tab\")\n    output.show()\n    output.write.format(\"com.databricks.spark.csv\").save(\"file:\/\/\/\/home\/spark\/output.csv\")\n\nif __name__ == '__main__': main()\n<\/code><\/pre>\n<p>\uc544\ub798 \uba85\ub839\uc73c\ub85c \uc55e\uc11c \ub9cc\ub4e0 job \uc744 \uc2e4\ud589\ud55c\ub2e4.<\/p>\n<p><code>spark-submit --jars \/app\/spark\/jars\/ext\/elasticsearch-hadoop\/6.4.1\/dist\/elasticsearch-hadoop-6.4.1.jar pyspark-submit_01.py<\/code><\/p>\n<h3>\uc218\ud589 \uacb0\uacfc<\/h3>\n<pre><code class=\"log\">2019-02-07 16:45:20 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n2019-02-07 16:45:20 INFO  SparkContext:54 - Running Spark version 2.3.2\n2019-02-07 16:45:20 INFO  SparkContext:54 - Submitted application: pyspark-submit_01\n2019-02-07 16:45:20 INFO  SecurityManager:54 - Changing view acls to: spark\n2019-02-07 16:45:20 INFO  SecurityManager:54 - Changing modify acls to: spark\n2019-02-07 16:45:20 INFO  SecurityManager:54 - Changing view acls groups to: \n2019-02-07 16:45:20 INFO  SecurityManager:54 - Changing modify acls groups to: \n2019-02-07 16:45:20 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spark); groups with view permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()\n2019-02-07 16:45:20 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 42167.\n2019-02-07 16:45:20 INFO  SparkEnv:54 - Registering MapOutputTracker\n2019-02-07 16:45:20 INFO  SparkEnv:54 - Registering BlockManagerMaster\n2019-02-07 16:45:20 INFO  BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information\n2019-02-07 16:45:20 INFO  BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up\n2019-02-07 16:45:20 INFO  DiskBlockManager:54 - Created local directory at \/tmp\/blockmgr-7e80acc2-be90-4ac1-b9ed-0971281e314a\n2019-02-07 16:45:20 INFO  MemoryStore:54 - MemoryStore started with capacity 366.3 MB\n2019-02-07 16:45:21 INFO  SparkEnv:54 - Registering OutputCommitCoordinator\n2019-02-07 16:45:21 INFO  log:192 - Logging initialized @1898ms\n2019-02-07 16:45:21 INFO  Server:351 - jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown\n2019-02-07 16:45:21 INFO  Server:419 - Started @1969ms\n2019-02-07 16:45:21 INFO  AbstractConnector:278 - Started ServerConnector@29df9709{HTTP\/1.1,[http\/1.1]}{0.0.0.0:4040}\n2019-02-07 16:45:21 INFO  Utils:54 - Successfully started service 'SparkUI' on port 4040.\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@539d4620{\/jobs,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6329f9ce{\/jobs\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7be94093{\/jobs\/job,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1286a091{\/jobs\/job\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3e4dcc86{\/stages,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@f91fe04{\/stages\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@8ab406a{\/stages\/stage,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7f969b69{\/stages\/stage\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2dec2ce1{\/stages\/pool,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@75e42251{\/stages\/pool\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@330f5263{\/storage,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7889cab8{\/storage\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@34ce7a6c{\/storage\/rdd,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@606c200e{\/storage\/rdd\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@29b9fa2a{\/environment,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1fd329af{\/environment\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@58893d23{\/executors,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@59e41a6c{\/executors\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3f6ac3d0{\/executors\/threadDump,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@39f20bc3{\/executors\/threadDump\/json,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@280349fb{\/static,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@14c79f00{\/,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@237e2725{\/api,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2de21930{\/jobs\/job\/kill,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@1ac50048{\/stages\/stage\/kill,null,AVAILABLE,@Spark}\n2019-02-07 16:45:21 INFO  SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http:\/\/node2.qrm:4040\n2019-02-07 16:45:21 INFO  SparkContext:54 - Added JAR file:\/\/\/app\/spark\/jars\/ext\/elasticsearch-hadoop\/6.4.1\/dist\/elasticsearch-hadoop-6.4.1.jar at spark:\/\/node2.qrm:42167\/jars\/elasticsearch-hadoop-6.4.1.jar with timestamp 1549525521261\n2019-02-07 16:45:21 INFO  SparkContext:54 - Added file file:\/home\/spark\/pyspark-submit_01.py at file:\/home\/spark\/pyspark-submit_01.py with timestamp 1549525521280\n2019-02-07 16:45:21 INFO  Utils:54 - Copying \/home\/spark\/pyspark-submit_01.py to \/tmp\/spark-5c0e461b-4e65-476c-a523-a25380409238\/userFiles-2bc774fb-df8d-47ef-b1d5-644625335f20\/pyspark-submit_01.py\n2019-02-07 16:45:21 INFO  Executor:54 - Starting executor ID driver on host localhost\n2019-02-07 16:45:21 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 38925.\n2019-02-07 16:45:21 INFO  NettyBlockTransferService:54 - Server created on node2.qrm:38925\n2019-02-07 16:45:21 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy\n2019-02-07 16:45:21 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, node2.qrm, 38925, None)\n2019-02-07 16:45:21 INFO  BlockManagerMasterEndpoint:54 - Registering block manager node2.qrm:38925 with 366.3 MB RAM, BlockManagerId(driver, node2.qrm, 38925, None)\n2019-02-07 16:45:21 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, node2.qrm, 38925, None)\n2019-02-07 16:45:21 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, node2.qrm, 38925, None)\n2019-02-07 16:45:21 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@7d3f5e97{\/metrics\/json,null,AVAILABLE,@Spark}\n+------------+\n|     request|\n+------------+\n|...ssue\/f...|\n|...roduct...|\n|...ee\/xfr...|\n|...ssue\/s...|\n|...ssue\/j...|\n|...dy\/flo...|\n|...ain.do...|\n|...ee\/xfr...|\n|...ee\/xfr...|\n|...s\/vers...|\n|...xec\/Pr...|\n|...atch\/p...|\n|...omment...|\n|...roduct...|\n|...nowled...|\n|...nterfa...|\n|...xec\/Us...|\n|...atch\/p...|\n|...engine.js|\n|...soLogo...|\n+------------+\nonly showing top 20 rows\n\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>[PySpark] Elasticsearch Index \uc5d0\uc11c DataFrame \uc0dd\uc131\ud558\uae30 elasticsearch-hadoop connector \ub97c \uc774\uc6a9\ud574\uc11c spark dataframe\uc744 \uc0dd\uc131\ud55c\ub2e4. Spark \uc124\uce58 spark \ub2e4\uc6b4\ub85c\ub4dc \ubc0f \ud658\uacbd\uc124\uc815 \ud544\uc694\ud55c \uc694\uc18c\ub9cc \uc124\uce58\ud558\uace0\uc790 \ud558\ub294 \uacbd\uc6b0 \uc18c\uc2a4\ucf54\ub4dc\ub97c \ub0b4\ub824\ubc1b\uc544\uc11c maven\uc744 \uc774\uc6a9\ud574 \ube4c\ub4dc\ud574\ub3c4 \ub418\uc9c0\ub9cc pre-compiled \ubc14\uc774\ub108\ub9ac\ub97c \uc774\uc6a9\ud55c\ub2e4. wget http:\/\/mirror.navercorp.com\/apache\/spark\/spark-2.3.2\/spark-2.3.2-bin-hadoop2.7.tgz tar -xvzf spark-2.3.2-bin-hadoop2.7.tgz -C \/app\/spark\/. mv \/app\/spark\/spark-2.3.2-bin-hadoop2.7 \/app\/spark\/2.3.2 find \/app\/spark\/2.3.2 -mindepth 1 -maxdepth 1 -type d | xargs -I {} [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[12,10,73],"tags":[25,34,102,104],"class_list":["post-846","post","type-post","status-publish","format-standard","hentry","category-elasticsearch","category-python","category-spark","tag-elasticsearch","tag-python","tag-spark","tag-104"],"_links":{"self":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts\/846","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/comments?post=846"}],"version-history":[{"count":3,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts\/846\/revisions"}],"predecessor-version":[{"id":1357,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts\/846\/revisions\/1357"}],"wp:attachment":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/media?parent=846"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/categories?post=846"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/tags?post=846"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}