kafka安装测试过程
kafka的性能在此不再赘述,百度一下很多,在此描述一下kafka的安装和测试过程:
-
安装kafka:
#tar -xzf kafka_2.9.2-0.8.1.tgz #cd kafka_2.9.2-0.8.1 #mv kafka_2.9.2-0.8.1 kafka
-
开启zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
-
开启kafka服务:
bin/kafka-server-start.sh config/server.properties
-
创建话题topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
具体kafka-topics.sh 的参数自行查看--help帮助 -
查看kafka服务中的topics:
bin/kafka-topics.sh --list --zookeeper localhost:2181 #列出topics如下 test
在2.8之前的版本中的shell脚本可能不同 -
打开produce,向test话题添加消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test xxxxxxxxxxxxxxxxx #输入内容后enter即可发送出消息内容
-
打开customer读取test话题内容:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning xxxxxxxxxxxxxxxxx
kafka的是scala语言编写的服务框架,因此用scala开发produce和custome应用程序应该是非常方便的,但是没有找到相应examples,但kafka也支持java和python以及c编写的客户端应用程序,下面分享一下java的代码片段(网络转载): - 消费者custome:
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerTest extends Thread { private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { ConsumerTest consumerThread = new ConsumerTest("1test"); consumerThread.start(); } public ConsumerTest(String topic) { consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", "master:2181"); props.put("group.id", "0"); props.put("zookeeper.session.timeout.ms", "400000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) System.out.println(new String(it.next().message())); } }
消息的生产者produce:import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "master:2181"); // zookeeper的一个节点地址 props.put("serializer.class", "kafka.serializer.StringEncoder");// kafka序列化方式 props.put("metadata.broker.list", "master:9092"); props.put("request.required.acks", "1"); //props.put("partitioner.class", "com.xq.SimplePartitioner"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String msg ="this is a messageuuu! XXXmessDageuuu"; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", msg); for(int i = 0 ; i < 5; i ++){ System.out.println("send"+i); producer.send(data); } producer.close(); } }
分别运行custom和produce即可看到控制台消息发送和接受的内容。 - 后续将继续更新kafka的各个参数的说明文档以及与spark集成,与flume集成。
相关推荐
kafka测试
向kafka插入数据测试
kafka及其性能测试 kafka及其性能测试 kafka及其性能测试 kafka及其性能测试 kafka及其性能测试 kafka及其性能测试
利用安装zookeeper的三台服务器搭建KAFKA集群,并对其进行验证测试
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了...文档介绍有关Kafka的简单安装和使用。
可视化kafka测试工具,配置好broker地址即可模拟发送topic消息
kafka 安装手册(单机) 保证step by step 经验总结,生产环境和测试环境可用
Kafka Test
kafka资源下载和测试代码
kafka配置安装详解及启动测试指南,基本入门教程,读写数据样例
Kafka性能测试报告.pdf
CentOS7网络配置,安装JDK,安装Kafka,订阅发布以及测试
Flink与kafka集成测试数据集
centos 7.4安装kafka 一、 关闭防火墙 二、 禁止selinux 三、 更新系统 四、 安装JDK 五、 配置Java环境变量 六、 安装kafka ...九、 测试kafka 十、 把zookeeper配置成服务 十一、 把kafka配置成服务
第一篇"第二篇第三篇第四篇第五篇本文详细介绍了Kafka性能测试方法及Benchmark报告。Kafka提供了非常多有用的工具,如Kafka设计解析(四)-Kafka HighAvailability(下)中提到的运维类工具——PartitionReassign ...
grafana上kafka 测试消费者组消费情况使用的json文件
kafka测试推送实例
jdk1.8,zookeeper,kafka的安装配置,下再,解压,更改配置,测试是否安装成功
这个是我自己根据网上的教程以及自己的经验总结出来的kafka的集群搭建以及测试,亲测可用,如果遇到一些小问题,请上网百度。或者私聊均可,可以给与帮助