目录
一、kafka开发环境安装
二、kafka相关操作
1、使用命令创建topic
2、列出所有的topic
3、删除topic:second_topic
4、查看topic的相关信息
三、kafka-console-producer-生产消息
四、kafka-console-consumer-消费消息
kafka的安装需要先安装docker,如果之前没有安装过docker,可以看我的这篇文章进行安装
Docker镜像、容器和数据管理
为了方便一次性安装kafka的开发环境,我们使用github开源的镜像
GitHub - lensesio/fast-data-dev: Kafka Docker for development. Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors
安装命令,可以直接用,不用改动任何东西
安装过程有点长,文件比较多,请耐心等待,如下是安装的服务列表
安装完后,访问http://192.168.43.36:3030/可以看到如下界面,此ip是我linux的地址
当然你也可以一个一个的进行安装,下边是不错的安装文档
Docker搭建kafka和zookeeper:http://blog.70ci.com/post/736.html
首先,我们在所有服务都启动的情况下,进入landoop/fast-data-dev容器
注意:此处我们需要重新开一个命令窗口,不要关闭当前服务的启动窗口
使用kafka-topics创建第一个topic:first_topic
创建topic,必须指定注册中心,topic的名字,创建多少个partitions,以及备份replication-factor的数量(实际中replication-factor应该为3,但我目前只在一台服务器上安装了kafka,只有一个broker,因此只能设置为1,因为replication-facto数量不能大于brokers的数量)
如果重复创建first_topic,服务会输出错误信息,从而保证topic的唯一性
可以看到以下相关信息,如Partition和replicationFactor的数量等
使用kafka-console-producer生产消息,需要指定broker和topic,命令如下
输入如上命令回车后,会出现输入提示,我们输入如下消息
然后进入kafka可视化页面,我们看到输入的message,被随机分配在了三个partition中
有时,我们需要把消息进行顺序的存储,而不是随机的分布在所有partition中,此时,我们可以设置一个key,那么相同key的消息都会被顺序的存储在一个partiton中
如下,我们加了两个属性,使用key,以及指定key和value的分隔符
我们来看下输入:
然后进入kafka可视化页面,我们看到输入的message,被顺序的存储在相同partiton中
使用kafka-console-consumer消费消息,需要指定broker和topic,命令如下
输入如上命令时,如果此时我们没有生产message,那么我们将看不到任何效果,因为此时的消费者默认读取从当前启动时间后生产的message,因此启动时间之前的message都将不被消费
我们此时创建一个生产者,并输入一些message,来看看效果:
如果想要消费消息队列中所有的message(包括历史消息),需要加上参数 --from-beginning
如果需要指定消费的partition,可以加上--partition参数
上边,我们指定消费partition2中的消息,一共消费了5条,我们通过UI界面,也可以看到partition2中确实有5条消息
ConsumerGruop
使用ConsumerGruop,把多个消费者划分到一个组中,组的名字通过group.id进行指定
通过group.id划分到组的consumer,每消费完一个message,都会提交一个committed offset,当组内的下一个consumer读数据时,会从committed offset处开始读取。因此,分组后的consumer,并不会重复消费组内中任意一个consumer已经消费完成的消息。
附:kafka中message存储的json形式