kafka入门02
一、回顾
1、核心概念
broker:进程
producer:生产者
consumer:消费者
topic:主题
partitions:分区 (副本数)
2、Topic详解
"123"---->第一台机器的0分区上(0分区还坐落在2/3台机器上的0分区)
读单个分区的时候也是按顺序读。
3、consumer groups
(1)有一个容错性的消费机制,一组的消费者中某个挂掉了,其他分区信息会给另外一个。
(2)一个组内,共享一个公共的ID——groupID,通过公共ID来管理组内消费者。
(3)组内的所有消费者协调在一起,去消费topic的所有分区。
(4)每个分区只能由同一个消费组的同一个消费者消费。
(5)效率提高:消费者组提高了并发
二、OFFSET
1、概念offset
Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
每个分区都是一个有序的、不可变的记录序列,它是一个结构化的提交日志。分区中的记录每个都被分配一个顺序的iD数字调用的偏移量,该偏移量唯一地标识分区内的每个记录。
一个分区,数据按顺序进来,第一个进来就是1,第二个就是2,这些数字就是偏移量。相当于MySQL自增长主键,每个分区记录offset(偏移量) 都是从1开始
offset: 每个partition的数据的id
offset:
00000000000000000000.index
00000000000000000000.log
1 0000000000000000001
2 0000000000000000002
3
...
2000
下一个log文件,就是从2000开始
00000000000000002000.index
00000000000000002000.log
2001 0000000000002001
2002 0000000000002002
(1)index文件分析
00000000000000000000.index
1,0 1代表第一条消息,offset;0代表偏移量,从0开始走了多少字节。
4,55
10,77
.....
.....
index存储是采取稀疏表的方式,并不是把所有数据记录起来,而是隔几条记录一次。
抛出问题:
1、消费offset 4,通过二分法找到最接近的index
2、消费offset 7,找到4,按顺序往下读
segment:
由 log+index 两两出现组成
(1)log文件记录全部message
(2)index文件记录 相对offset和对应的消息的物理偏移量 字节位置,稀疏存储(不是每个索引都存)
(3)参数log.segment.bytes控制切新的segment的大小。
(4)参数log.roll.hours控制切新的segment的时间。
(5)segment命名规则
名称是由上一组的最后一条消息的offset来命名
三、消费语义
1、概念
at most once:最多消费一次,消费消息有可能丢失,但是不会重复消费。
at least once:至少消费一次,消费消息不可能丢失,但是会重复消费。
exactly once:正好一次,消息不会丢失也不会重复。0.10.0.1kafka版本不支持不能实现。0.11官方已支持。
处理方案:
(1)log允许丢,at most once
(2)log不允许丢,at least once+去重
(3)MySQL SQL语句 at least once
例如:
insert into jepson1 12;--->hbast ok
insert into jepson2 14;挂了,就会重新消费上面一条语句,造成多消费了一遍。这个时候,很多公司是把上述数据的哈希值,当到redis内存库,如果已经有了,就不要这条数据。
J哥:选择HBase put:如果数据已经有了就进行update操作,如果没有,就insert。
insert into jepson3 13;
四、consumer offset
1、概念
1,2,3,4,5
只消费1,2,3,由consumer自己维护,如果offset没有维护,consumer挂了,是不是从上一次更新的offset的位置去消费
断点还原
0.10版本维护在kafka自己本身
topic:test
内嵌一个topic——consumer...维护kafka
五、分区策略哈希值取模源码
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
六、Flume-->Kafka-->Spark streaming 经典案例
http://spark.import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
----------官网源码-------------
(1)J哥比官网要多两个参数
auto.offset.reset
如果在Kafka中没有初始偏移,或者服务器上不再存在当前偏移,kafka中offset的重置机制
earliest:自动将偏移量重置为最早的偏移量
latest:自动将偏移量重置为最新的偏移量
none:抛错
anything else:抛错
(2)auto.commit.interval.ms
默认true
选择了true的话,消费者偏移量会自动提交到Kafka的频率(以毫秒为单位),后面有个参数决定频率。
如果是kafka维护offset的话,如果在某个offset挂了,还自动提交到kafka,那样消费消费最新的偏移量的时候就会出现bug。所以这个参数要手动设置为false
(3)kafka eagle(https://www.cnblogs.com/smartloli/p/5829395.html)
Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息
CDH TSQL
(4)生产者消费者数据糅合
1)生产者和消费者的速度是一样嘛?
由于趋势度吻合,所以速度一样的。说明kafka,spark-stream运行没问题。
2)为什么消费者的曲线和生产者的曲线趋势度 吻合?
及时消费
么有压力
3)为什么消费者的曲线和生产者的曲线要高?纵坐标是字节数/s,横坐标是时间/s
生产: value
消费:
topic
value
....