分布式消息队列Kafka之发布订阅消息系统

0x00 教程内容

  1. 启动Kafka
  2. 创建Topic
  3. 启动生产者与消费者
  4. 演示消息发布订阅

前提:
先安装好Zookeeper、Kakfa
版本是:
zookeeper-3.4.10kafka_2.11-1.0.0

参考教程:
D011 复制粘贴玩大数据之安装与配置Kafka集群
D003 复制粘贴玩大数据之安装与配置Zookeeper集群

0x01 启动Kafka

1. 启动Zookeeper

a. 启动Zookeeper(三台均需执行)
zkServer.sh start

2. 启动Kafka

a. 后台启动Kafka(三台均需执行)
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
分布式消息队列Kafka之发布订阅消息系统

0x02 创建Topic

1. 创建Topic

a. 创建topic(3个副本、5个分区、名为:topic_sny
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 5 --topic snytopic
分布式消息队列Kafka之发布订阅消息系统

2. 查看topic列表

a. 查看所有topic
kafka-topics.sh --zookeeper master:2181 --list
分布式消息队列Kafka之发布订阅消息系统

3. 查看topic详情信息

a. 查看topic详情信息(将--create换成--describe,然后去掉创建的参数)
kafka-topics.sh --describe --zookeeper master:2181 --topic snytopic
分布式消息队列Kafka之发布订阅消息系统

0x03 启动生产者与消费者

1. 启动生产者

a. 在master上执行:
kafka-console-producer.sh --broker-list master:9092 --topic snytopic
分布式消息队列Kafka之发布订阅消息系统
PS:启动之后,处于待输入状态(绿色小箭头)

2. 启动消费者

a. 在slave1上执行(也可以在其他节点执行):
kafka-console-consumer.sh --bootstrap-server master:9092 --topic snytopic --from-beginning
分布式消息队列Kafka之发布订阅消息系统
PS:启动之后,处于待订阅状态

0x04 演示消息发布订阅

1. 发送消息

a. 在生产者界面(即master)输入内容
hello,shaonaiyi!
分布式消息队列Kafka之发布订阅消息系统

2. 订阅消息

a. 会发现消费者界面(即salve1)会自动订阅到相应的内容
分布式消息队列Kafka之发布订阅消息系统

0xFF 总结

  1. 删除topic(如果kafka配置delete.topic.enable=true,那么可以直接删除topic,执行删除topic命令,否则只是标记删除,并没有删除数据,同时也不能往这个topic写入数据,想要彻底删除可以进入Zookeeper相对应的路径手动删除)
    kafka-topics.sh --zookeeper master:2181 --delete --topic snytopic
  2. zookeeper删除多级路径:rmr /brokers/topics/snytopic
  3. zookeeper删除无子节点的节点路径:delete /brokers/topics/snytopic/0/state
  4. 思考题:
    1、当前咱们是有三个节点,尝试创建一个有3个副本以上的topic看有什么效果?
    kafka-topics.sh --create --zookeeper master:2181 --replication-factor 4 --partitions 5 --topic snytopic2
    2、尝试将topic标为删除状态,然后重新发布订阅,观察有何效果?
    3、如何恢复topic的状态?提示:(ls /admin/delete_topics

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
公众号、微博、CSDN邵奈一

福利:
邵奈一的技术博客导航