kafka
1 初识Kafka
Apache Kafka最早是由LinkedIn开源出来的分布式消息系统,现在是Apache旗下的一个子项目,并且已经成为开源领域应用最广泛的消息系统之一。Kafka社区非常活跃,从0.9版本开始,Kafka的标语已经从“一个高吞吐量,分布式的消息系统”改为"一个分布式流平台"。
- 1 Kafka和传统的消息系统不同在于:
- kafka是一个分布式系统,易于向外扩展。
- 它同时为发布和订阅提供高吞吐量
- 它支持多订阅者,当失败时能自动平衡消费者
- 消息的持久化(基于磁盘的数据存储)
- 2 kafka和其他消息队列的对比:
kafka | activemq | |
背景 | Kafka 是LinkedIn 开发的一个高性能、分布式的消息系统,广泛用于日志收集、流式数据处理、在线和离线消息分发等场景 | ActiveMQActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。 |
开发语言 | java,scala | Java |
协议支持 | 自己制定的一套协议 | JMS协议 |
持久化支持 | 支持 | 支持 |
事务支持 | 0.11.0之后支持 | 支持 |
producer容错 | 在kafka中提供了ack配置选项, request.required.acks=-1,级别最低,生产者不需要关心是否发送成功 request.required.acks=0,只需要leader分区有了即可 request.required.acks=1,isr集合中的所有同步了才返回 可能会有重复数据 |
发送失败后即可重试 有ack模型ack模型可能重复消息事务模型保证完全一致 |
吞吐量 | kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高 | |
负载均衡 | kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上 |
- 3 关键名词解析
# |
名称 |
解释 |
1 |
Broker(代理) |
消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 |
2 |
Topic(主题) |
Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic |
3 |
Partition(分区) |
物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的 |
4 |
Replica(副本) |
副本,每个partion有多个副本,存储在不同的broker上,保证消息的高可用 |
5 |
Segment(片段) |
partition物理上由多个segment组成,每个Segment存着message信息 |
6 |
Message(消息) |
消息,是基本的通讯单位,由一个key,一个value和时间戳构成 |
7 |
Producer(生产者) |
消息生产者,向Broker发送消息的客户端 |
8 |
Consumer(消费者) |
消息消费者,从Broker读取消息的客户端 |
9 |
ConsumerGroup(消费者群组) |
每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息 |
2 kafka原理解析
2.1.1 生产者producer
创建kafka生产者producer
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”); ★ broker地址清单
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); ★ key序列化
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); ★ value序列化
Producer<String, String> producer = new KafkaProducer<>(props)producer发送消息到kafka-broker
ProducerRecord<String, String> record = new ProducerRecord<>(“topic”, “key”, “value”); ★ 多种构造方式
producer.send(record); ★ 发送并忘记(fire-and-forget)
RecordMetadata rm = producer.send(record).get(); ★ 同步发送
producer.send(record,new Callback(){ ★ 异步发送
public void onCompletion(RecordMetadata rm,Exception e){
…
}
}
);
在下图中在我们的生产者会决定发送到哪个Partition。
- 如果没有Key值则进行轮询发送。
- 如果有Key值,对Key值进行Hash,然后对分区数量取余,保证了同一个Key值的会被路由到同一个分区,如果想队列的强顺序一致性,可以让所有的消息都设置为同一个Key。
消费者consumer
创建kafka消费者
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”); ★ borker地址清单
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); ★ key反序列化
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); ★ value反序列化
props.put(“group.id”, “gp-group”);★ 设置所属消费者群组
Consumer<String, String> consumer = new KafkaConsumer<>(props);
订阅主题:一个消费者可以同时订阅多个主题
consumer.subscribe(Collections.singletonList(“gp_topic”))
consumer.subscribe(“gp.*”)try { while (true) { ★ 无限循环
ConsumerRecords<String, Customer> records = consumer.poll(100); ★ 轮询消息
for (ConsumerRecord<String, Customer> r : records) { ★ 逐条处理
System.out.printf(“offset = %d, key = %s, value = %s%n”, r.offset(), r.key(), r.value());
}
}
} finally {
consumer.close(); ★ 关闭消费者
}poll: 加入群组,接受分配到分区;轮询获取数据;发送心跳;自动提交
close: 关闭网络连接;主动通知GroupCoordinator,自己已挂消费成功后提交
偏移量:offset,消息在分区中的位置
提交:commit,更新分区当前位置,避免重复消费
提交方式:1)自动提交 2)手动提交(同步 vs.异步提交)
本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.
- 如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
- 如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
- 在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.
- kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.
2.2 kafka架构图
Kafka中ZooKeeper的用途:
- 正如ZooKeeper用于分布式系统的协调和促进,Kafka使用ZooKeeper也是基于相同的原因。
- ZooKeeper用于管理、协调Kafka代理。每个Kafka代理都通过ZooKeeper协调其它Kafka代理。
- 当Kafka系统中新增了代理或者某个代理故障失效时,ZooKeeper服务将通知生产者和消费者。生产者和消费者据此开始与其它代理协调工作。
2.3 kafka的消费模型
消息由生产者发送到kafka集群后,会被消费者消费。一般来说我们的消费模型有两种:推送模型(push)和拉取模型(pull)
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。如果采用push,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。
- Kafka采取拉取模型(poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。
3 kafka网络模型
3.1 KafkaClient --单线程Selector
单线程模式适用于并发链接数小,逻辑简单,数据量小。
Kafka中为什么consumer和producer使用单线程,服务端使用多线程模型?
- 在kafka中,consumer和producer都是使用的上面的单线程模式。这种模式不适合kafka的服务端,在服务端中请求处理过程比较复杂,会造成线程阻塞,一旦出现后续请求就会无法处理,会造成大量请求超时,引起雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。
3.2 Kafka--server -- 多线程Selector
在kafka服务端采用的是多线程的Selector模型,Acceptor运行在一个单独的线程中,对于读取操作的线程池中的线程都会在selector注册read事件,负责服务端读取请求的逻辑。成功读取后,将请求放入message queue共享队列中。然后在写线程池中,取出这个请求,对其进行逻辑处理,即使某个请求线程阻塞了,还有后续的县城从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,由于注册了OP_WIRTE事件,所以还需要对其发送响应。
4 kafka的高可靠分布式存储模型
在Kafka中保证高可靠模型的依靠的是副本机制,有了副本机制之后,就算机器宕机也不会发生数据丢失。
4.1高性能的日志存储
kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下,假设有1000条消息,每个LogSegment大小为100,下面展现了900-1000的索引和Log:
由于kafka消息数据太大,如果全部建立索引,即占了空间又增加了耗时,所以kafka选择了稀疏索引的方式,这样的话索引可以直接进入内存,加快偏查询速度。
简单介绍一下如何读取数据,如果我们要读取第911条数据首先第一步,找到他是属于哪一段的,根据二分法查找到他属于的文件,找到0000900.index和00000900.log之后,然后去index中去查找 (911-900) =11这个索引或者小于11最近的索引,在这里通过二分法我们找到了索引是[10,1367]然后我们通过这条索引的物理位置1367,开始往后找,直到找到911条数据。
上面讲的是如果要找某个offset的流程,但是我们大多数时候并不需要查找某个offset,只需要按照顺序读即可,而在顺序读中,操作系统会对内存和磁盘之间添加page cahe,也就是我们平常见到的预读操作,所以我们的顺序读操作时速度很快。但是kafka有个问题,如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机I/O这个时候对性能影响很大。所以一般来说Kafka不能有太多的partition。针对这一点,RocketMQ把所有的日志都写在一个文件里面,就能变成顺序写,通过一定优化,读也能接近于顺序读。
4.2副本机制
Kafka的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫Reblance),kafka每个主题的每个分区都有一个主副本以及0个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
在Kafka中并不是所有的副本都能被拿来替代主副本,所以在kafka的leader节点中维护着一个ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:
-
节点必须和ZK保持连接
-
在同步的过程中这个副本不能落后主副本太多
ISR = leader + 没有落后太多的副本;
分区复制
- 首领副本 :每个分区都有一个首领副本。首领副本负责处理所有生产者和消费者的请求
- 跟随者副本:首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端端请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。
- AR:Assigned Replicas,所有副本
- ISR:In-Sync Replicas,已同步的副本
- OSR:Outof-Sync Replicas,掉队的副本
- AR = ISR + OSR
这里先要说下两个名词:HW(高水位)是consumer能够看到的此partition的位置,LEO是每个partition的log最后一条Message的位置。HW能保证leader所在的broker失效,该消息仍然可以从新选举的leader中获取,不会造成消息丢失。
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
-
1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
-
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-
-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(其他节点都和zk断开连接,或者都没追上),这样就变成了acks=1的情况。
5 Kafka生产者事务和幂等
详情见:https://blog.csdn.net/mlljava1111/article/details/81180351
5.1 幂等
幂等性引入目的:
- 生产者重复生产消息。生产者进行retry会产生重试时,会重复产生消息。有了幂等性之后,在进行retry重试时,只会生成一个消息
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
- PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
- Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。
5.2 事务
事务属性是2017年Kafka 0.11.0.0引入的新特性。类似于数据库事务,只是这里的数据源是Kafka,kafka事务属性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。
在事务属性之前先引入了生产者幂等性,它的作用为:
- 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败
- consumer-transform-producer模式下,因为消费者提交偏移量出现问题,导致在重复消费消息时,生产者重复生产消息。需要将这个模式下消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。
可以思考一下:
1.为什么需要分区,也就是说主题只有一个分区,难道不行吗?2.日志为什么需要分段