{"id":594,"date":"2018-09-11T11:09:24","date_gmt":"2018-09-11T02:09:24","guid":{"rendered":"https:\/\/dong1lkim.oboki.net\/?p=594"},"modified":"2019-09-01T22:21:04","modified_gmt":"2019-09-01T13:21:04","slug":"kafka-producer-consumer-sample","status":"publish","type":"post","link":"https:\/\/oboki.net\/workspace\/data-engineering\/kafka\/kafka-producer-consumer-sample\/","title":{"rendered":"[Kafka] Producer &#038; Consumer Sample"},"content":{"rendered":"<h1>Building Kafka Producer &amp; Consumer Application Using Maven<\/h1>\n<h2>Maven \uc124\uce58<\/h2>\n<blockquote><p>\n  JAVA\ub9cc \uc900\ube44\ub3fc \uc788\ub2e4\uba74 maven project \uc0ac\uc774\ud2b8\uc5d0\uc11c pre-compiled binary \ubc1b\uc544\uc11c \uadf8\ub300\ub85c \uc0ac\uc6a9\ud558\uba74 \ub428.\n<\/p><\/blockquote>\n<h3>apache maven binary \ub2e4\uc6b4\ub85c\ub4dc<\/h3>\n<p><code>wget <a href=\"http:\/\/mirror.navercorp.com\/apache\/maven\/maven-3\/3.5.4\/binaries\/apache-maven-3.5.4-bin.tar.gz\"><a href=\"http:\/\/mirror.navercorp.com\/apache\/maven\/maven-3\/3.5.4\/binaries\/apache-maven-3.5.4-bin.tar.gz\">http:\/\/mirror.navercorp.com\/apache\/maven\/maven-3\/3.5.4\/binaries\/apache-maven-3.5.4-bin.tar.gz<\/a><\/a><\/code><\/p>\n<h3>\ud658\uacbd \ubcc0\uc218 \uc124\uc815<\/h3>\n<pre><code class=\"bash\"># JDK ENV\nexport JAVA_HOME=\/app\/jdk\nexport PATH=$JAVA_HOME\/bin:$PATH\n\n# MAVEN ENV\nexport PATH=\/app\/maven\/bin:$PATH\n<\/code><\/pre>\n<h3>\uc124\uce58<\/h3>\n<ol>\n<li>\uc555\ucd95 \ud574\uc81c<br \/>\n<code>tar -xvzf apache-maven-3.5.4-bin.tar.gz -C \/app\/maven<\/code>  <\/li>\n<li>\uacbd\ub85c \uc124\uc815<br \/>\n<code>cd \/app\/maven<\/code><br \/>\n<code>mv apache-maven-3.5.4 3.5.4<\/code>  <\/li>\n<li>Symbolic Link \uc0dd\uc131<br \/>\n<code>ln -s .\/3.5.4\/* .<\/code>  <\/li>\n<\/ol>\n<p><\/p>\n<h2>Producer Application<\/h2>\n<h3>\ud504\ub85c\uc81d\ud2b8 \uc0dd\uc131<\/h3>\n<pre><code class=\"bash\">mvn archetype:generate -DgroupId=com.tmax.dbps.bigdata -DartifactId=kafka_producer_test -Dpackage=com.tmax.dbps.bigdata.kafka\n<\/code><\/pre>\n<h3>pom.xml \ud30c\uc77c \uc218\uc815<\/h3>\n<p><code>vi kafka_producer_test\/pom.xml<\/code><\/p>\n<pre><code class=\"xml\">  &lt;dependencies&gt;\n    &lt;dependency&gt;\n      &lt;groupId&gt;junit&lt;\/groupId&gt;\n      &lt;artifactId&gt;junit&lt;\/artifactId&gt;\n      &lt;version&gt;4.11&lt;\/version&gt;\n      &lt;scope&gt;test&lt;\/scope&gt;\n    &lt;\/dependency&gt;\n    &lt;dependency&gt;\n      &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;\n      &lt;artifactId&gt;kafka_2.11&lt;\/artifactId&gt;\n      &lt;version&gt;0.10.0.0&lt;\/version&gt;\n    &lt;\/dependency&gt;\n  &lt;\/dependencies&gt;\n<\/code><\/pre>\n<h3>main source \uc791\uc131<\/h3>\n<p><code>vi kafka_producer_test\/src\/main\/java\/com\/tmax\/dbps\/bigdata\/kafka\/DemoProducer.java<\/code><\/p>\n<pre><code class=\"java\">package com.tmax.dbps.bigdata.kafka;\n\nimport java.util.Properties;\nimport java.util.concurrent.Future;\nimport org.apache.kafka.clients.producer.KafkaProducer;\nimport org.apache.kafka.clients.producer.ProducerRecord;\nimport org.apache.kafka.clients.producer.RecordMetadata;\n\npublic class DemoProducer{\n  public static void main (final String[] args){\n    Properties producerProps = new Properties();\n    producerProps.put(\"bootstrap.servers\",\"localhost:9092\");\n    producerProps.put(\"key.serializer\",\n        \"org.apache.kafka.common.serialization.StringSerializer\");\n    producerProps.put(\"value.serializer\",\n        \"org.apache.kafka.common.serialization.StringSerializer\");\n    producerProps.put(\"acks\",\"all\");\n    producerProps.put(\"retries\",1);\n    producerProps.put(\"batch.size\",20000);\n    producerProps.put(\"linger.ms\",1);\n    producerProps.put(\"buffer.memory\",24568545);\n    KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;String,String&gt;(producerProps);\n\n    for (int i=0;i&lt;2000;i++){\n      ProducerRecord data = new ProducerRecord&lt;String,String&gt;(\"test1\",\"Hello this is record\" + i); \n      Future&lt;RecordMetadata&gt; recordMetadata = producer.send(data);\n    }\n\n    producer.close();\n  }\n}\n<\/code><\/pre>\n<h3>Build<\/h3>\n<pre><code class=\"bash\">mvn install\n<\/code><\/pre>\n<h3>Run<\/h3>\n<pre><code class=\"bash\">java -cp \\\n\/home\/kafka\/.m2\/repository\/org\/slf4j\/slf4j-api\/1.5.6\/slf4j-api-1.5.6.jar\\\n:\/home\/kafka\/.m2\/repository\/org\/slf4j\/jcl-over-slf4j\/1.5.6\/jcl-over-slf4j-1.5.6.jar\\\n:target\/kafka_producer_test-1.0-SNAPSHOT.jar\\\n:\/home\/kafka\/.m2\/repository\/org\/apache\/kafka\/kafka_2.11\/0.10.0.0\/kafka_2.11-0.10.0.0.jar\\\n:\/home\/kafka\/.m2\/repository\/org\/apache\/kafka\/kafka-clients\/0.10.0.0\/kafka-clients-0.10.0.0.jar\\\n:\/home\/kafka\/.m2\/repository\/org\/slf4j\/slf4j-jdk14\/1.5.6\/slf4j-jdk14-1.5.6.jar\\\n com.tmax.dbps.bigdata.kafka.DemoProducer\n<\/code><\/pre>\n<p><\/p>\n<h2>Consumer Application<\/h2>\n<h3>\ud504\ub85c\uc81d\ud2b8 \uc0dd\uc131<\/h3>\n<pre><code class=\"bash\">mvn archetype:generate -DgroupId=com.tmax.dbps.bigdata -DartifactId=kafka_consumer_test -Dpackage=com.tmax.dbps.bigdata.kafka\n<\/code><\/pre>\n<h3>pom.xml \ud30c\uc77c \uc218\uc815<\/h3>\n<p><code>vi kafka_consumer_test\/pom.xml<\/code><\/p>\n<pre><code class=\"xml\">  &lt;dependencies&gt;\n    &lt;dependency&gt;\n      &lt;groupId&gt;junit&lt;\/groupId&gt;\n      &lt;artifactId&gt;junit&lt;\/artifactId&gt;\n      &lt;version&gt;4.11&lt;\/version&gt;\n      &lt;scope&gt;test&lt;\/scope&gt;\n    &lt;\/dependency&gt;\n    &lt;dependency&gt;\n      &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;\n      &lt;artifactId&gt;kafka_2.11&lt;\/artifactId&gt;\n      &lt;version&gt;0.10.0.0&lt;\/version&gt;\n    &lt;\/dependency&gt;\n  &lt;\/dependencies&gt;\n<\/code><\/pre>\n<h3>main source \uc791\uc131<\/h3>\n<p><code>vi kafka_consumer_test\/src\/main\/java\/com\/tmax\/dbps\/bigdata\/kafka\/DemoConsumer.java<\/code><\/p>\n<pre><code class=\"java\">package com.tmax.dbps.bigdata.kafka;\n\nimport org.apache.kafka.clients.consumer.*;\nimport org.apache.kafka.common.TopicPartition;\nimport org.apache.log4j.Logger;\nimport java.util.*;\n\npublic class DemoConsumer{\n    private static final Logger log = Logger.getLogger(DemoConsumer.class);\n\n    public static void main(String[] args) throws Exception{\n        String topic = \"test1\";\n        List&lt;String&gt; topicList = new ArrayList&lt;&gt;();\n        topicList.add(topic);\n        Properties consumerProperties = new Properties();\n        consumerProperties.put(\"bootstrap.servers\",\"node7.big:9092\");\n        consumerProperties.put(\"group.id\",\"Demo_Group\");\n        consumerProperties.put(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\");\n        consumerProperties.put(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\");\n        consumerProperties.put(\"enable.auto.commit\",\"true\");\n        consumerProperties.put(\"auto.commit.interval.ms\",\"1000\");\n        consumerProperties.put(\"session.timeout.ms\",\"30000\");\n\n        KafkaConsumer&lt;String,String&gt; demoKafkaConsumer = new KafkaConsumer&lt;String,String&gt;(consumerProperties);\n\n        demoKafkaConsumer.subscribe(topicList);\n        log.info(\"Subscribed to topic \" + topic);\n\n        int i=0;\n        try{\n            while (true){\n                ConsumerRecords&lt;String,String&gt; records = demoKafkaConsumer.poll(500);\n                for(ConsumerRecord&lt;String,String&gt; record : records){\n                    log.info(\"offset = \" + record.offset() + \"key =\" + record.key() + \"value =\" + record.value());\n                    System.out.println(\"offset = \" + record.offset() + \"key =\" + record.key() + \"value =\" + record.value());\n                }\n\n                demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {\n                        public void onComplete(Map&lt;TopicPartition,OffsetAndMetadata&gt; map, Exception e){\n                        }\n                        });\n            }\n        } catch(Exception ex){\n        } finally{\n            try{\n                demoKafkaConsumer.commitSync();\n            } finally{\n                demoKafkaConsumer.close();\n            }\n        }\n    }\n}\n<\/code><\/pre>\n<h3>Run<\/h3>\n<pre><code class=\"bash\">java -cp \/home\/kafka\/.m2\/repository\/org\/slf4j\/slf4j-api\/1.5.6\/slf4j-api-1.5.6.jar:\/home\/kafka\/.m2\/repository\/org\/slf4j\/jcl-over-slf4j\/1.5.6\/jcl-over-slf4j-1.5.6.jar:target\/kafka_consumer_test-1.0-SNAPSHOT.jar:\/home\/kafka\/.m2\/repository\/org\/apache\/kafka\/kafka_2.11\/0.10.0.0\/kafka_2.11-0.10.0.0.jar:\/home\/kafka\/.m2\/repository\/org\/apache\/kafka\/kafka-clients\/0.10.0.0\/kafka-clients-0.10.0.0.jar:\/home\/kafka\/.m2\/repository\/org\/slf4j\/slf4j-jdk14\/1.5.6\/slf4j-jdk14-1.5.6.jar:\/home\/kafka\/apache-log4j-1.2.17\/log4j-1.2.17.jar:\/app\/kafka\/1.1.1\/libs\/commons-lang3-3.5.jar:\/app\/kafka\/1.1.1\/libs\/kafka_2.11-1.1.1.jar com.tmax.dbps.bigdata.kafka.DemoConsumer\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Building Kafka Producer &amp; Consumer Application Using Maven Maven \uc124\uce58 JAVA\ub9cc \uc900\ube44\ub3fc \uc788\ub2e4\uba74 maven project \uc0ac\uc774\ud2b8\uc5d0\uc11c pre-compiled binary \ubc1b\uc544\uc11c \uadf8\ub300\ub85c \uc0ac\uc6a9\ud558\uba74 \ub428. apache maven binary \ub2e4\uc6b4\ub85c\ub4dc wget http:\/\/mirror.navercorp.com\/apache\/maven\/maven-3\/3.5.4\/binaries\/apache-maven-3.5.4-bin.tar.gz \ud658\uacbd \ubcc0\uc218 \uc124\uc815 # JDK ENV export JAVA_HOME=\/app\/jdk export PATH=$JAVA_HOME\/bin:$PATH # MAVEN ENV export PATH=\/app\/maven\/bin:$PATH \uc124\uce58 \uc555\ucd95 \ud574\uc81c tar -xvzf apache-maven-3.5.4-bin.tar.gz -C \/app\/maven \uacbd\ub85c \uc124\uc815 cd \/app\/maven [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[16],"tags":[129,139],"class_list":["post-594","post","type-post","status-publish","format-standard","hentry","category-kafka","tag-java","tag-kafka"],"_links":{"self":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts\/594","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/comments?post=594"}],"version-history":[{"count":4,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts\/594\/revisions"}],"predecessor-version":[{"id":1232,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/posts\/594\/revisions\/1232"}],"wp:attachment":[{"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/media?parent=594"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/categories?post=594"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/oboki.net\/workspace\/wp-json\/wp\/v2\/tags?post=594"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}