Building Kafka Producer & Consumer Application Using Maven
Maven 설치
JAVA만 준비돼 있다면 maven project 사이트에서 pre-compiled binary 받아서 그대로 사용하면 됨.
apache maven binary 다운로드
wget http://mirror.navercorp.com/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
환경 변수 설정
# JDK ENV
export JAVA_HOME=/app/jdk
export PATH=$JAVA_HOME/bin:$PATH
# MAVEN ENV
export PATH=/app/maven/bin:$PATH
설치
- 압축 해제
tar -xvzf apache-maven-3.5.4-bin.tar.gz -C /app/maven
- 경로 설정
cd /app/maven
mv apache-maven-3.5.4 3.5.4
- Symbolic Link 생성
ln -s ./3.5.4/* .
Producer Application
프로젝트 생성
mvn archetype:generate -DgroupId=com.tmax.dbps.bigdata -DartifactId=kafka_producer_test -Dpackage=com.tmax.dbps.bigdata.kafka
pom.xml 파일 수정
vi kafka_producer_test/pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
main source 작성
vi kafka_producer_test/src/main/java/com/tmax/dbps/bigdata/kafka/DemoProducer.java
package com.tmax.dbps.bigdata.kafka;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class DemoProducer{
public static void main (final String[] args){
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers","localhost:9092");
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks","all");
producerProps.put("retries",1);
producerProps.put("batch.size",20000);
producerProps.put("linger.ms",1);
producerProps.put("buffer.memory",24568545);
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(producerProps);
for (int i=0;i<2000;i++){
ProducerRecord data = new ProducerRecord<String,String>("test1","Hello this is record" + i);
Future<RecordMetadata> recordMetadata = producer.send(data);
}
producer.close();
}
}
Build
mvn install
Run
java -cp \
/home/kafka/.m2/repository/org/slf4j/slf4j-api/1.5.6/slf4j-api-1.5.6.jar\
:/home/kafka/.m2/repository/org/slf4j/jcl-over-slf4j/1.5.6/jcl-over-slf4j-1.5.6.jar\
:target/kafka_producer_test-1.0-SNAPSHOT.jar\
:/home/kafka/.m2/repository/org/apache/kafka/kafka_2.11/0.10.0.0/kafka_2.11-0.10.0.0.jar\
:/home/kafka/.m2/repository/org/apache/kafka/kafka-clients/0.10.0.0/kafka-clients-0.10.0.0.jar\
:/home/kafka/.m2/repository/org/slf4j/slf4j-jdk14/1.5.6/slf4j-jdk14-1.5.6.jar\
com.tmax.dbps.bigdata.kafka.DemoProducer
Consumer Application
프로젝트 생성
mvn archetype:generate -DgroupId=com.tmax.dbps.bigdata -DartifactId=kafka_consumer_test -Dpackage=com.tmax.dbps.bigdata.kafka
pom.xml 파일 수정
vi kafka_consumer_test/pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
main source 작성
vi kafka_consumer_test/src/main/java/com/tmax/dbps/bigdata/kafka/DemoConsumer.java
package com.tmax.dbps.bigdata.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import java.util.*;
public class DemoConsumer{
private static final Logger log = Logger.getLogger(DemoConsumer.class);
public static void main(String[] args) throws Exception{
String topic = "test1";
List<String> topicList = new ArrayList<>();
topicList.add(topic);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers","node7.big:9092");
consumerProperties.put("group.id","Demo_Group");
consumerProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("enable.auto.commit","true");
consumerProperties.put("auto.commit.interval.ms","1000");
consumerProperties.put("session.timeout.ms","30000");
KafkaConsumer<String,String> demoKafkaConsumer = new KafkaConsumer<String,String>(consumerProperties);
demoKafkaConsumer.subscribe(topicList);
log.info("Subscribed to topic " + topic);
int i=0;
try{
while (true){
ConsumerRecords<String,String> records = demoKafkaConsumer.poll(500);
for(ConsumerRecord<String,String> record : records){
log.info("offset = " + record.offset() + "key =" + record.key() + "value =" + record.value());
System.out.println("offset = " + record.offset() + "key =" + record.key() + "value =" + record.value());
}
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,OffsetAndMetadata> map, Exception e){
}
});
}
} catch(Exception ex){
} finally{
try{
demoKafkaConsumer.commitSync();
} finally{
demoKafkaConsumer.close();
}
}
}
}
Run
java -cp /home/kafka/.m2/repository/org/slf4j/slf4j-api/1.5.6/slf4j-api-1.5.6.jar:/home/kafka/.m2/repository/org/slf4j/jcl-over-slf4j/1.5.6/jcl-over-slf4j-1.5.6.jar:target/kafka_consumer_test-1.0-SNAPSHOT.jar:/home/kafka/.m2/repository/org/apache/kafka/kafka_2.11/0.10.0.0/kafka_2.11-0.10.0.0.jar:/home/kafka/.m2/repository/org/apache/kafka/kafka-clients/0.10.0.0/kafka-clients-0.10.0.0.jar:/home/kafka/.m2/repository/org/slf4j/slf4j-jdk14/1.5.6/slf4j-jdk14-1.5.6.jar:/home/kafka/apache-log4j-1.2.17/log4j-1.2.17.jar:/app/kafka/1.1.1/libs/commons-lang3-3.5.jar:/app/kafka/1.1.1/libs/kafka_2.11-1.1.1.jar com.tmax.dbps.bigdata.kafka.DemoConsumer