大数据实时计算---Kafka介绍(下)

kafka基本概念介绍

架构图如下
大数据实时计算---Kafka介绍(下)

  • Producer
    消息生产者, 就是向 kafka broker 发消息的客户端。

  • Consumer
    消息消费者, 向 kafka broker 取消息的客户端

  • Topic
    在kafka中,一个topic就是一个队列,topic由一个或多个分区组成,分区分布在kafka个节点上。分区由顺序得,不变得消息队列组成,已经存在得消息不能更改,新写入得消息追加到最后面。每个分区均会存在对应得持久化文件和索引文件。
    大数据实时计算---Kafka介绍(下)
    和传统得消息队列不同,Kafka中的消息不会再消费后就被删除,相反他们存可配置的过期时间,到了时间就由kafka定时清除。
    Kafuka的consumer需要寻访消息的消费偏移量(offset),因此重置偏移量将能够调整消费的消息。Kafka引入分区有这个设计的优势是:

    • topic的容量不受单台服务器的容量大小限制
    • 并发新能更好
    • 调度分布策略灵活

    如上图,分区分布在整个kafka集群的所有服务器上,每个服务器节点基于分区处理的数据和请求,每个分区可以由多个副本容错。

  • Consumer Group (CG)
    这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer) 的手段。 一个 topic 可以有多个 CG。 topic 的消息会复制(不是真的复制, 是概念上的) 到所有的 CG, 但每个 partition 只会把消息发给该 CG 中的一个consumer。 如果需要实现广播, 只要每个 consumer 有一个独立的 CG 就可以了。 要实现单播只要所有的 consumer 在同一个 CG。 用 CG 还可以将 consumer 进行*的分组而不需要多次发送消息到不同的 topic。

  • Partition
    为了实现扩展性, 一个非常大的 topic 可以分布到多个 broker( 即服务器) 上, 一个topic 可以分为多个 partition, 每个 partition 是一个有序的队列。 partition 中的每条消息都会被分配一个有序的 id(offset) 。 kafka 只保证按一个 partition 中的顺序将消息发给 consumer, 不保证一个 topic 的整体(多个 partition 间) 的顺序。

  • Offset
    kafka 的存储文件都是按照 offset.kafka 来命名, 用 offset 做名字的好处是方便查找。 例如你想找位于 2049 的位置, 只要找到 2048.kafka 的文件即可。 当然 the first offset 就是00000000000.kafka.

  • leader
    每个 Replication 集合中的 Partition 都会选出一个唯一的 Leader, 所有的读写请求都由Leader 处理。 其他 Replicas 从 Leader 处把数据更新同步到本地, 过程类似大家熟悉的 MySQL中的 Binlog 同步。 每个 Cluster 当中会选举出一个 Broker 来担任 Controller, 负责处理 Partition的 Leader 选举, 协调 Partition 迁移等工作。

  • ISR(In-Sync Replica)
    是 Replicas 的一个子集, 表示目前 Alive 且与 Leader 能够“Catch-up” 的Replicas 集合。 由于读写都是首先落到 Leader 上, 所以一般来说通过同步机制从 Leader 上拉取数据的 Replica 都会和 Leader 有一些延迟(包括了延迟时间和延迟条数两个维度), 任意一个超过阈值都会把该 Replica 踢出 ISR。 每个 Partition 都有它自己独立的 ISR。

Kafka的消息分发

分发的完整性

Producer 客户端负责消息的分发

  • kafka 集群中的任何一个 broker 都可以向 producer 提供 metadata 信息,这些 metadata 中包含"集群中存活的 servers 列表"/"partitions leader 列表"等信息;
  • 当 producer 获取到 metadata 信息之后, producer 将会和 Topic 下所有 partition leader 保持socket 连接;
  • 消息由 producer 直接通过 socket 发送到 broker, 中间不会经过任何"路由层", 事实上, 消息被路由到哪个 partition 上由 producer 客户端决定;
    比如可以采用"random"“key-hash”"轮询"等,如果一个 topic 中有多个partitions,那么在producer 端实现"消息均衡分发"是必要的。
  • 在 producer 端的配置文件中,开发者可以指定 partition 路由的方式。

Producer 消息发送的应答机制
设置发送数据是否需要服务端的反馈,有三个值 0,1,-1

  • 0: producer 不会等待 broker 发送 ack
  • 1: 当 leader 接收到消息之后发送 ack
  • -1: 当所有的 follower 都同步消息成功后发送 ackrequest.required.acks=-1

Producer的异步缓存机制
Producer直接将消息发送到Broker上,中间没有任何网络传输;针对一个存在多个分区的topic,生产者可以自定义某些消息该往哪个分区发送(进行哈希,轮询,指定分区)

kafka给producer提供了异步发送机制,将消息生产后西安累计再内存中,当累计的消息量达到一个可配置的阈值(如100条消息或者5秒)的时候,再将数据一次性发送,这将能够降低Producer的I/O操作,但是采用这种机制,当数据还没有真正发送出去的Producer奔溃的时候,将会由部分的数据丢失。另外0.9版本的Kafka才会增加异步发送的回调。

Kafka 文件存储机制

1. Kafka 文件存储基本结构

  • 在 Kafka 文件存储中, 同一个 topic 下有多个不同 partition, 每个 partition 为一个目录, partiton命名规则为 topic 名称+有序序号, 第一个 partiton 序号从 0 开始, 序号最大值为 partitions数量减 1。
  • 每个 partion(目录)相当于一个巨型文件被平均分配到多个大小相segment(段)数据文件中。 但每个段 segment file 消息数量不一定相等, 这种特性方便 old segment file 快速被删除。默认保留 7 天的数据。大数据实时计算---Kafka介绍(下)
  • 每个 partiton 只需要支持顺序读写就行了, segment 文件生命周期由服务端配置参数决定。(什么时候创建, 什么时候删除)
    大数据实时计算---Kafka介绍(下)

数据有序的讨论?
一个 partition 的数据是否是有序的? 间隔性有序, 不连续针对一个 topic 里面的数据, 只能做到 partition 内部有序, 不能做到全局有序。
特别加入消费者的场景后, 如何保证消费者消费的数据全局有序的? 伪命题。只有一种情况下才能保证全局有序? 就是只有一个 partition。

Kafka Partition Segment

Segment file 组成:由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件。
大数据实时计算---Kafka介绍(下)
Segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对
应数据文件中 message 的物理偏移地址。
大数据实时计算---Kafka介绍(下)
上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。
其中以索引文件中元数据 3,497 为例,依次在数据文件中表示第 3 个 message(在全局 partiton表示第 368772 个 message)、以及该消息的物理偏移地址为 497。

Kafka 查找 message

读取 offset=368776 的 message, 需要通过下面 2 个步骤查找
大数据实时计算---Kafka介绍(下)

  • 查找 segment file
    00000000000000000000.index 表示最开始的文件, 起始偏移量(offset)为 000000000000000368769.index 的消息量起始偏移量为
    368770 = 368769 + 1
    00000000000000737337.index 的起始偏移量为 737338=737337 + 1
    其他后续文件依次类推。
    以起始偏移量命名并排序这些文件, 只要根据 offset 二分查找文件列表, 就可以快速定位到具体文件。 当 offset=368776 时定位00000000000000368769.index 和对应 log 文件。
  • 通过 segment file 查找 message
    当 offset=368776 时 , 依 次 定 位 到 00000000000000368769.index 的 元 数 据 物 理 位 置 和00000000000000368769.log 的物理偏移地址
    然后再通过 00000000000000368769.log 顺序查找直到 offset=368776 为止。

Kafka 自定义 Partition


package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;


public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

  
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}

}

  • 如果指定 partition, 就用 partition
  • 如果指定 key, 使用 key 进行 hash 取模。
  • 如果没有指定 key, 使用轮询的方式。

Kafka 的高吞吐和快速响应

Broker
不同于 Redis 和 MemcacheQ 等内存消息队列, Kafka 的设计是把所有的Message 都要写入速度低容量大的硬盘, 以此来换取更强的存储能力。 实际上, Kafka 使用硬盘并没有带来过多的性能损失,“ 规规矩矩” 的抄了一条“ 近道” 。
首先, 说“ 规规矩矩” 是因为 Kafka 在磁盘上只做 Sequence I/O, 由于消息系统读写的特殊性, 这并不存在什么问题。 关于磁盘 I/O 的性能, 引用一组 Kafka 官方给出的测试数据(Raid-5, 7200rpm):
Sequence I/O: 600MB/s
Random I/O: 100KB/s
所以通过只做 Sequence I/O 的限制, 规避了磁盘访问速度低下对性能可能造成的影响。

  • PageCache功能
    接下来我们再聊一聊 Kafka 是如何“ 抄近道的” 。

  • 首先, Kafka 重度依赖底层操作系统提供的 PageCache 功能。 当上层有写操作时, 操作系统只是将数据写入 PageCache, 同时标记 Page 属性为 Dirty。当读操作发生时, 先从 PageCache 中查找, 如果发生缺页才进行磁盘调度, 最终返回需要的数据。

  • 实际上 PageCache 是把尽可能多的空闲内存都当做了磁盘缓存来使用。 同时如果有其他进程申请内存, 回收 PageCache 的代价又很小, 所以现代的 OS 都支持 PageCache。
    使用 PageCache 功能同时可以避免在 JVM 内部缓存数据, JVM 为我们提供了强大的 GC 能力, 同时也引入了一些问题不适用与 Kafka 的设计。
    如果在 Heap 内管理缓存, JVM 的 GC 线程会频繁扫描 Heap 空间, 带来不必要的开销。 如果 Heap过大, 执行一次 Full GC 对系统的可用性来说将是极大的挑战。

  • 所有在在 JVM 内的对象都不免带有一个 Object Overhead(千万不可小视), 内存的有效空间利用率会因此降低。
    所有的 In-Process Cache 在 OS 中都有一份同样的 PageCache。 所以通过将缓存只放在 PageCache, 可以至少让可用缓存空间翻倍。
    如果 Kafka 重启, 所有的 In-Process Cache 都会失效, 而 OS 管理的PageCache 依然可以继续使用。

  • Sendfile
    在数据发送端,kafka使用sendfile调用减少数据从硬盘读取到发送之间内核态和用户之间的数据复制。

传统上,放用户需要读取磁盘上的数据并发送客户端的时候,会经历这样的步骤

  1. 打开文件磁盘上的文件准本读取
  2. 创建远端secket的连接
  3. 循环从磁盘上读取数据
  4. 讲读取到的数据发送出去
  5. 完成发送后关闭文件和远程连接

仔细分析其中的步骤,我们发现,在整个过程中,一份数据的发送需要多次复制。

  1. 通过read调用每次从磁盘上读取文件,数据会被从磁盘上复制到内核空间,然后被复制到读取进程所在的用户空间。
  2. 通过write调用将数据从进程所在的用户空间发送出去时候,数据会被从用户空间复制到内核空间,在被复制到对应的网卡缓冲区,
  3. 期间数据经历了多此复制,在用户态和内核态之间的多次转换,每一次都将产生非常昂贵的上下们切换。
    当有大量的数据仅仅需要从文件读取并被发送时候,代价会非常大。

大数据实时计算---Kafka介绍(下)

sendfile系统调用优化了上述的流程

  1. 首先将数据从磁盘复制到内核空间
  2. 再从内核空间复制到发送缓冲区,
  3. 最终被发送出去。

在Linux系统中,sendfile可以支持数据发送到文件,发送到网络设备等。sendfile是Kerenl2.2提供的新特性。再从内核空间复制到发送缓冲区,最终被发送出去。
大数据实时计算---Kafka介绍(下)

通过分析,我们可以发现,

  • 通过简单的read/write读取并发送数据,需要4次系统调用和4次数据复制。
  • 使用sendfile只产生两次系统的调用和数据的复制。
  • 由于每次空间切换内核将产生中断,保护现场(堆,栈,寄存器的值需要保护以备执行完成后来回的切换)等动作,每一次数据的复制会消耗大量的cpu.sendfile对于这两个优化带来的变化是数据发送吞吐量提高,同时减少了cup资源的消耗。当存在大量需要从硬盘发送的数据的时候其优势将非常明显。
    也正是因为如此,很多涉及到文件下载。发送的服务都支持直接sendfile调用。

文件保存的副本机制

另外,文件存储时候为了保证高可用,还引进了文件的副本机制,一般备份数据未奇数份,其中存在leader,作为外部生产者的写入数据的入口,在数据写入的同时会简历存储区域的连接,对数据进行备份。当备份完成后会进行ack响应机制,如果单节点宕机,会讲备份节点选举为leader.
设计的基本思想是和HDFS的数据备份相似。

Kafka数据的消费

在消费消息的时候,consumer确定要消息的topic的具体分区和偏移量,并确定需要从哪个偏移量开始读取消息,发送到broker服务端,kafka服务端将一个日志块返回给consumer

消息推拉

kafka采取生产者推消息大broker,consumer从kafka拉消息。本质上是为了保护下游不会承受较大数据处理压力。缺陷是当消息的消费速度大于生产速度的时候,会导致consumer空轮询,处理方法是可以通过配置参数允许consumer阻塞在一个长的空轮询上。

Kafka消费端消息的保证

  • 对于消息系统而言,如何保证消息的偏移量是关乎性能的因素。很多消费系统在服务端的broker保存消费的偏移量,这意味着消息被消费后,服务端需要马上记录好他的偏移量,好处是服务端能够知道什么消息被消费并进行删除处理,以便维持整个数据的规模。坏处是当消费端异常或者发送超时的时候,会导致消息丢失。
  • 当然可以在服务端和消费端引入确认消息,即消息被消费后只是标记不删除,直到消费端确认。
  • 但是当消费成功但是发送确认异常时候,将会到hi消息被消费两次,另外服务端的Broker需要为每个消息维护被消费的标记而增加成本。
  • kafka中一个topic存在多个分区,每个分区在任意时间段只能被一consumer消费,偏移量只是一个简单的数值,由消费端负责。
    一次kafka可以又有效得保证数据得丢失,但是会在一些情况下出现数据重复消费得问题

consumer消费流程

consumer在启动后,在开始消费消息之前,会经历一下的流程

  1. 在指定的消费组(consumer group)**册consumerId,即Consumers/consumerGroup/Ids
  2. 在 Consumers/consumerGroup/Ids**册watch,用于处理新的Consumer加入或者老的Comsumer退出等情况的(将会出发reblance)对应处理
  3. 在/Broker/ids**册watch.以便Broker存在异常时对应处理
  4. 若消费端使用Topic过滤器,则需要在Broker/Topics上注册watch一便获取到Topic的变化消息。
  5. 出发消费组的reblance(重新平衡,重新给各个consumer分配分区)

Consumer的重新平衡

重新平衡是同一个消费组内所有的consumer就consumer消费达成共识。
Consumer重新平衡在一下任意的情况下将会被触发

  • Broker删除或者新增
  • 同一个消费组内Consumer删除或者新增
    任何情况下,一个分区总是仅仅会被一个consumer消费,因此当消费组内消费者数量大于分区数量的时候,将存在消费不到任何数据的comsumer.重新平衡的流程如下
  1. 设PT为topic的所有分区
  2. 设GC为同一组消费组里面的所有consumer,而CI为消费Topic T的consumer
  3. 对PT进行排序(以便相同Broker上的分区紧挨着)
  4. 对GC进行排序
  5. 假设i为Comsumer中的某个分区的序号,则有N=size(PT)/size(GC)
  6. 分配N*i到(N+1)*i-1的分区给ConsumerCi
    7.更新zookeeper上的注册信息,即网partition上写入ConsumerId,默认是写到zookeeper的/consumers//owers/
    /partitionid。
    大数据实时计算---Kafka介绍(下)

Kafuka一键启动脚本

  • 首先配置kafka的环境变量
  • 在kafka的bin目录下,创建下面三个文件

slave节点文件,配置需要启动的节点名(对应的/etc/hosts中需要配置映射信息)

nodel01
nodel02
nodel03

startkafka.sh文件,用于启动kafka集群

cat /export/server/kafka/bin/slave | while read line
do
{
   echo $line
    ssh $line "source /etc/profile;nohup kafka-server-start.sh /export/server/kafka/config   /server.properties >/dev/null 2>&1 &"
}&
wait
done 

stopkafka.sh文件,用于停止集群的脚本

cat /export/server/kafka/bin/slave | while read line
do
{
 echo $line
 ssh $line "source /etc/profile;jps |grep Kafka |cut -c 1-5 |xargs kill -s 9 "
}&
wait
done 

Kafuka监控工具

点击下载kefka监控工具

  1. 解压进入build目录,点击
    大数据实时计算---Kafka介绍(下)
    大数据实时计算---Kafka介绍(下)

连接以后可以获取下面的信息
大数据实时计算---Kafka介绍(下)



Kafka的介绍到这里结束,下次介绍storm.