微信搜索lxw1234bigdata | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

Kafka Java API实现的简单Producer和Consumer

Kafka lxw1234@qq.com 25891℃ 3评论

关键字:Kafka Java API、producer、consumer

前面的文章《Kafka安装配置测试》中安装配置了分布式的Kafka集群,并且使用自带的kafka-console-producer.sh和kafka-console-consumer.sh模拟测试了发送消息和消费消息。

本文使用简单的Java API模拟Kafka的producer和consumer,其中,procuder从一个文本文件中逐行读取内容,然后发送到Kafka,consumer则从Kafka中读取内容并在控制台打印。

Java API Producer

package com.lxw1234.kafka;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyProducer {

	public static void main(String[] args) {
			Properties props = new Properties();
			props.put("serializer.class", "kafka.serializer.StringEncoder");
			props.put("metadata.broker.list", "172.16.212.17:9091,172.16.212.17:9092,172.16.212.102:9091,172.16.212.102:9092");
	    Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
	    String topic = "lxw1234.com";
	    
	    File file = new File("E:/track-log.txt");
	    BufferedReader reader = null;
	    try {
	    	reader = new BufferedReader(new FileReader(file));
	    	String tempString = null;
	    	int line = 1;
	    	while ((tempString = reader.readLine()) != null) {
	    		producer.send(new KeyedMessage<Integer, String>(topic,line + "---" + tempString));
	    		System.out.println("Success send [" + line + "] message ..");
	    		line++;
	    	}
	    	reader.close();
	    	System.out.println("Total send [" + line + "] messages ..");
	    } catch (Exception e) {
	    	e.printStackTrace();
	    } finally {
	    	if (reader != null) {
		    	try {
		    		reader.close();
		    	} catch (IOException e1) {}
	    	}
	    }
	    producer.close();
	    
	}
}

程序从E:/track-log.txt文件中读取内容,发送至Kafka。

Java API Consumer

package com.lxw1234.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
	public static void main(String[] args) {
		String topic = "lxw1234.com";
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); 
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
	    while(it.hasNext())
	      System.out.println("consume: " + new String(it.next().message()));
	}
	
	private static ConsumerConfig createConsumerConfig() {
	    Properties props = new Properties();
	    props.put("group.id","group1");
	    props.put("zookeeper.connect","zk1:2181,zk2:2181,zk3:2181");
	    props.put("zookeeper.session.timeout.ms", "400");
	    props.put("zookeeper.sync.time.ms", "200");
	    props.put("auto.commit.interval.ms", "1000");
	    return new ConsumerConfig(props);
	  }
}

Consumer从Kafka中消费数据,并在控制台中打印消息内容。

运行和结果

先运行Consumer,之后再运行Producer,运行时候将$KAFKA_HOME/lib/下的所有jar包依赖进去。

Producer运行结果如下:

Kafka Producer

文件中只有50000行记录,因为最后又把行号加了一次,因此最后打印出是50001.

Consumer运行结果如下:

Kafka Consumer

 

Consumer成功获取了5000条数据。

关于Kafka,还有很多疑问,继续尝试和学习吧,enjoy it!

您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Kafka Java API实现的简单Producer和Consumer

喜欢 (18)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(3)个小伙伴在吐槽
  1. maven配置呢?!
    秦沙2017-01-24 18:34 回复
  2. 你的topic都是linux下的,现在在windows下运行 怎么能跑通呢?
    knuth19892017-06-09 17:34 回复
  3. :cry: 不知道什么jar包没有加载,运行时候报错了
    小飞飞2019-08-07 10:36 回复