spring cloud : 消息中间件kafka

1.    安装Kafka

1.     下载JDK、kafka(自带Zookeeper)

http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz下载

,解压到D:\STS_ENV\kafka_2.11-1.0.0(执行bin下的脚本,此处使用windows所以使用windows/下的bat脚本)

2.     下载SecureCRT 连接

查看服务器系统版本 cat /proc/version

[[email protected] ~]# cat /etc/redhat-release

Red Hat Enterprise Linux Server release 7.4 (Maipo)

 

1.     安装jdk 1.8.0_121

=》上传到/opt目录

=》解压jdk (tar-zxvf  jdk-8u121-linux-x64.tar

=》配置环境变量:vi /etc/profile添加如下参数:

export JAVA_HOME=/opt/jdk1.8.0_121

export PATH=$JAVA_HOME/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

=》生效 source/etc/profile ,检查java –version

 

2.     启动zookeeper 和kafka

进入 /opt

tar zxvf kafka_2.11-0.11.0.1.tar.gz

mv kafka_2.11-0.11.0.1 kafka_2_11

vi /etc/profile 输入

#set kafka

export KAFKA_HOME=/opt/kafka_2_11

export PATH=/opt/kafka_2_11/bin:$PATH

使用环境变量生效: source /etc/profile

 

进入kafka根目录修改配置文件config/zookeeper.properties:

dataDir=/opt/ kafka _2_11/zklogs

config/server.properties 中:

log.dirs=/opt/ kafka _2_11/kafkalogs

后台启动zookeeper-server-start.sh -daemon /opt/kafka_2_11/config/zookeeper.properties

后台启动kafka-server-start.sh  -daemon  /opt/kafka_2_11/config/server.properties

3.     验证

创建名为atopic的队列组(包含一个分区一个Replica):

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic atopic

查看命令:kafka-topics.sh--list --zookeeper localhost:2181

 

发消息:

kafka-console-producer.sh --broker-listlocalhost:9092 --topic atopic

收消息:

kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic atopic --from-beginning

 

ü  注意:

红帽防火墙开启端口2181: firewall-cmd --zone=public --add-port=2181/tcp --permanent

重启防火墙 firewall-cmd --reload

删除某端口: firewall-cmd --zone=public --remove-port=80/tcp  

查询防火墙端口状态: firewall-cmd --zone=public --query-port=2181/tcp

解释:永久生效:--permanent

 

以 root 身份输入以下信息,重新加载防火墙并中断用户连接,即丢弃状态信息:

firewall-cmd --complete-reload

通常在防火墙出现严重问题时,这个命令才会被使用。比如,防火墙规则是正确的,但却出现状态信息问题和无法建立连接。

以 root 用户身份输入以下命令,找出公共区域这一个区域的所有设置(包括所有端口):

firewall-cmd --zone=public --list-all

其他红帽问题请前往红帽官网

集群配置注意:

1.      拷贝配置好的kafka文件到子节点服务器,然后修改broker.id的值,不能一样,我分配设置的是2,3,4

2.启动kafka之前需启动zookeeper,然后启动kafka,各个节点需单独启动

通过jps查看jps是否启动成功:

[[email protected] ~]# jps

2433 Jps

2120 Kafka

1995 QuorumPeerMain

查看进程zookeeper是否启动:

ps -ef | grep zookeeper

强制关闭进程的方法:

ps aux|grepzookeeper   #找到对应进程pid

kill -9 {pid}             #强制关闭指定进程

 

l  Windows版本验证命令:

1.      开启zk (窗口1)

zkServer

2.      开启kafka(窗口2)

.\bin\windows\kafka-server-start.bat .\config\server.properties

 

1.创建主题

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test.topic

 

3.      2.创建生产者(窗口3)

kafka-console-producer.bat --broker-list localhost:9092 --topic test.topic

3.创建消费者(窗口4)

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test.topic

spring cloud : 消息中间件kafka


问题总结(写时超时):

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for springCloudBus-0 due to 30003 ms has passed since batch creation plus linger time

公开发布地址:

配置kafka(config/server.properties)参数,把advertised.host.name设置成ip地址

advertised.host.name=<broker public IP address>

重启kafka

kafka-server-stop.sh

kafka-server-start.sh  -daemon  /opt/kafka_2_11/config/server.properties