从阿里到网易云centOS7.2服务器迁移服务和各项数据(kafka集群部分)

简单介绍一下kafka的基本机制

消息传输流程

从阿里到网易云centOS7.2服务器迁移服务和各项数据(kafka集群部分)

Producer即生产者,向kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息。有点像微博里的特别关注一样,只要我关注的人一发动态或消息,我就能第一时间收到通知。

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

从图中可以看出同一个Topic下的消费者和生产者的数量并不对等。

kafka服务器消息存储策略

从阿里到网易云centOS7.2服务器迁移服务和各项数据(kafka集群部分)

谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接受到神缠着发送的消息之后,会根据均衡策略将消息存储到不同的分区中。

从阿里到网易云centOS7.2服务器迁移服务和各项数据(kafka集群部分)

在每个分区中,消息以顺序存储,最晚接受的消息会最后被消费。

与生产者的交互

从阿里到网易云centOS7.2服务器迁移服务和各项数据(kafka集群部分)

生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中,

也可以通过指定均衡策略来将消息发送到不同的分区中

如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中。

与消费者的交互

从阿里到网易云centOS7.2服务器迁移服务和各项数据(kafka集群部分)

在消费者消费消息时,kafka使用offset来记录当前消费的位置

在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的消费的记录位置offset各不相同,互不干扰。

对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区之多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费。

因此,若一个group中的消费者数量大于分区数量的话,多于的消费者将不会收到任何消息。

(offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的***叫做offset,用于partition唯一标识一条消息.)

接下来是安装配置kafka

解压包并放到指定位置+改名

tar -zxvf kafka_2.10-0.10.2.0.tgz -C /home/
mv kafka_2.10-0.10.2.0 kafka

修改配置文件

vim config/server.properties

修改其中

broker.id=1
log.dirs=data/kafka-logs

存在一点疑惑server.properties里面的一个参数zookeeper.connect,两个ip是一样的?

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.16.220.146:2181,172.16.220.146:2181

然后启动kafka集群

[[email protected] bin]# ./kafka-server-start.sh -daemon ../config/server.properties &

关闭kafka

./bin/kafka-server-stop.sh

创建一个主题

我们用一个分区和一个副本创建一个名为“ruizhitest”的主题

[[email protected] bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ruizhitest
Created topic "ruizhitest".


创建topic

[[email protected] bin]# ./kafka-topics.sh --create --zookeeper 192.168.16.84:2181 --replication-factor 2 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

我们现在可以看到这个话题,如果我们运行列表主题命令:

[[email protected] bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
ruizhitest


#查看topic中的属性(副本,等)

[[email protected] bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,0    Isr: 1,0

检查测试:

  登录zookeeper(切换到zk的bin目录下),先连接zk:

[[email protected] bin]# ./zkCli.sh -server 192.168.16.86:2181

输出:

Connecting to 192.168.16.86:2181
2018-10-13 20:13:19,631 [myid:] - INFO  [main:[email protected]] - Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT
2018-10-13 20:13:19,635 [myid:] - INFO  [main:[email protected]] - Client environment:host.name=digdate-serv2.localdomain
2018-10-13 20:13:19,635 [myid:] - INFO  [main:[email protected]] - Client environment:java.version=1.8.0_121
2018-10-13 20:13:19,637 [myid:] - INFO  [main:[email protected]] - Client environment:java.vendor=Oracle Corporation
2018-10-13 20:13:19,637 [myid:] - INFO  [main:[email protected]] - Client environment:java.home=/usr/java/jdk1.8.0_121/jre
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:java.class.path=/home/zookeeper/bin/../build/classes:/home/zookeeper/bin/../build/lib/*.jar:/home/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/home/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/home/zookeeper/bin/../lib/log4j-1.2.16.jar:/home/zookeeper/bin/../lib/jline-0.9.94.jar:/home/zookeeper/bin/../zookeeper-3.4.9.jar:/home/zookeeper/bin/../src/java/lib/*.jar:/home/zookeeper/bin/../conf:.:/usr/java/jdk1.8.0_121/lib:/usr/java/jdk1.8.0_121/jre/lib
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:java.io.tmpdir=/tmp
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:java.compiler=<NA>
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:os.name=Linux
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:os.arch=amd64
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:os.version=3.10.0-693.el7.x86_64
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:user.name=root
2018-10-13 20:13:19,638 [myid:] - INFO  [main:[email protected]] - Client environment:user.home=/root
2018-10-13 20:13:19,639 [myid:] - INFO  [main:[email protected]] - Client environment:user.dir=/home/zookeeper/bin
2018-10-13 20:13:19,640 [myid:] - INFO  [main:[email protected]] - Initiating client connection, connectString=192.168.16.86:2181 sessionTimeout=30000 [email protected]
Welcome to ZooKeeper!
2018-10-13 20:13:19,662 [myid:] - INFO  [main-SendThread(192.168.16.86:2181):[email protected]] - Opening socket connection to server 192.168.16.86/192.168.16.86:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2018-10-13 20:13:19,733 [myid:] - INFO  [main-SendThread(192.168.16.86:2181):[email protected]] - Socket connection established to 192.168.16.86/192.168.16.86:2181, initiating session
2018-10-13 20:13:19,746 [myid:] - INFO  [main-SendThread(192.168.16.86:2181):[email protected]] - Session establishment complete on server 192.168.16.86/192.168.16.86:2181, sessionid = 0x1666c504f390004, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

查看一下状态:

[zk: 192.168.16.86:2181(CONNECTED) 0] ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]

说明:zookeeper集群建好之后,通过“ls /”出来的只有zookeeper,连接kafka使用后,/ 下面多了不少东西,其中通过查看/brokers/ids可以

发现一个kafka都没有连上。

再输入:

ls /brokers/ids

输出:

[0, 1]

那应该是已经检查到了安装的两台kafka的broker.id[0,1]

未完待续~