Kafka学习笔记
初入公司,接到一个卡夫卡的问题,将学习笔记输出在这里。
Kafka相关资料
Kafka 就是一款基于发布与订阅的消息系统。它一般被称为“分布式提交日志”或者“分布式流平台”。
Kafka 的数据单元被称为消息,消息可以有一个可选的元数据 ,也就是键。批次就是一组消息,这些消息属于同一个主题和分区。
Kafka的基本术语
Topic
Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).
Producer
发布消息的对象称之为主题生产者(Kafka topic producer)
Consumer
订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
集群成员关系
通过zookeeper维护,每个broker的id唯一,可在配置文件指定,或自动生成。具体配置见Zookeeper配置。在 broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从 Zookeeper 上断开连接,此时 broker 在启动时创建的临时节点会自动从 Zookeeper 上移除。监听broker列表的Kafka组件会被告知该 broker已移除。在关闭 broker 时,它对应的节点也会消失,不过它的 ID 会继续存在于其他数据结构中 。例如,主题的副本列表里就可能包含这些id。在完全关闭一个 broker 之后,如果使用相同的id启动另一个全新的 broker,它会立即加入集群,并拥有与旧 broker相同的分区和主题。
控制器
就是一个broker,额外负责首领选举,其他 broker 在控制器节点上创建Zookeeper watch 对象。如果控制器关闭或与zookeeper断开,其他broker就尝试成为控制器。如果一个broker断开,控制器负责为失去首领的分区选择新的首领(一般就是分区副本的下一个),控制器使用epoch 来避免两个节点同时认为自己是当前的控制器。
Kafka数据可靠保证
- Kafka可以保证分区消息的顺序;
- 只有消息写入所有同步副本才认为消息是“已提交”的;
- 只要有一个活跃副本,已提交消息就不会丢失;
- 消费者只能读取已提交消息;
复制(ISR)
所有生产者请求和消费者请求都会经过首领副本。跟随者唯一的任务就是从首领那里复制消息,保持与首领一致的状态。首领确保跟随者数据与其保持一致,跟随者数据不一致则无法成为新首领。replica.lag.time.max.ms用来设定跟随者的正常不活跃时间。
跟随者活跃的判断条件:1. 它在过去的6s内(可配置)向Zookeeper 发送过心跳。2. 过去的10s内(可配置)从首领那里获取过消息。3. 在过去的10s内从首领那里获取过最新的消息,还必须是几乎零延迟的。如果不活跃,跟随者会与zookeeper重新建立连接,重新获取最新数据。如果ack设置为all,而跟随者不活跃,则消费者可以忽略此跟随者响应。
主题级别的配置副本数量参数是replicatlon.factor,而在broker级别则可以通过 default.replication.factor 来配置自动创建的主题。
unclean.leader.selection.enable设置为true则允许不完全的首领选举,即如果首领副本崩溃,而其他跟随者副本都没有同步,那么允许不同步的副本成为首领。
min.insync.replic设定最小同步副本数量。如果不够,则broker拒绝接收消息,返回错误,副本相当于变成只读。
发送确认
ack =0,可以保证惊人的吞吐量,但是生产者完全不知道出现了数据丢失;
ack=1,也有可能出现丢失,因为可能成功写入后首领副本出现了崩溃;
ack=all,配合min.insync.replicas可以实现最保险的办法,生产者会一直重试直到数据发送成功,但是会降低吞吐量。
生产者重试设置
首先要区分发送消息错误可否重试,如果要保证数据不丢失,那么就需要在遇到可重试错误时进行重试,要注意重试次数。另外,还要注意重试可能会导致都写入成功而重复,可以在消息里加入唯一的标识符加以辨别。
可靠系统里使用消费者
如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。在这种情况下,如果其他消费者接手了工作,那些没有被处理完的消息就会被忽略,永远得不到处理。具体分析详见轮询
有几个设置需要注意:
- group.id 如果希望一个消费者获取到一个topic的所有消息,则需要设置其唯一的group.id
- auto.offset.reset 可配置为earliest与latest,none,需要注意的是,虽然是如果已有提交offset,从offset开始消费,但是长时间保持生产者速度大于消费者消费速度,消费者记录的offset被删除,那么还是会触发从最早还是从最新offset开始消费。
- enable.auto.commit 此项容易造成重复消费,如在提交前停止处理消息,这时消息已经被处理一部分但是offset没有提交上去。而且如果将消息交给线程处理,那么可能消息还没有处理完offset就已经提交了。
- auto.commit.interval.ms 默认5s自动提交一次offset,频率越低重复消费的可能性越低,但是会提高额外开销。
显式提交偏移量
- 总是在处理完事件后再提交偏移量;
- 提交频度是性能和重复消息数量之间的权衡;
- 确保对提交的偏移量心里有数:在轮询(poll)中提交的偏移量是读取到的最新偏移量,而不是处理到的最新偏移量;
- 再均衡:需要注意对再均衡的处理;
- 消费者可能需要重试:有两种处理方式,一种是在遇到可重试错误时,提交最后一个处理成功的偏移量,然后把还没有处理好的消息保存到缓冲区里(这样下一个轮询就不会把它们覆盖掉),调用消费者的 pause()方怯来确保其他的轮询不会返回数据(不需要担心在重试时缓冲区隘出),在保持轮询的同时尝试重新处理。如果重试成功,或者重试次数达到上限井决定放弃,那么把错误记录下来井丢弃消息,然后调用resume()方能让消费者继续从轮询里获取新数据。另一种则是在遇到可重试错误时,把错误写入一个独立的主题,然后继续。一个独立的消费者群组负责从该主题上读取错误消息,井进行重试,或者使用其中的一个消费者同时从该主题上读取错误消息并进行重试,不过在重试时需要暂停该主题。这种模式有点像其他消息系统里的 dead-letter-queue。
- 消费者可能需要维护状态;
- 长时间处理:将数据交给线程池来处理,加快速度。并在保持轮询的状态下调用pause(),以防止再均衡。
模式
消息模式 (schema) Json XML
主题和分区
消息通过主题进行分类,主题可以被分为若干个分区。
通过分区来实现数据冗余和伸缩性。一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。
Topic的删除
实际测试的时候,发现删除topic时,kafka只是将其marked as deleted,并未真正删除,此时继续向topic发送消息仍然成功,需要在server.properties设置一个属性:delete.topic.enable=true,这样才是真正的删除了topic。一般清空topic内的数据可以删除topic,不用重新创建,直接再向topic发送消息即可。具体的操作可以查看某一topic数据清空
生产者
创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
序列化器将消息序列化成字节数组,以网络传输。
如果消息成功写入 Kafka,就返回 一 个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 则会返回 一个错误。
消息发送方式
- 发送并忘记
- 同步发送
- 异步发送
KafkaProducer一般会发生两类错误。其中一类是可重试错误 ,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主( no leader )” 错误则可以通过重新为分区选举首领来解决。 KafkaProducer可以被配置成自动重试,如果在多次重试后仍无能解决问题,应用程序会收到一个重试异常。另一类错误无出通过重试解决,比如“消息太大”异常。对于这类错误, KafkaProducer不会进行任何重试,直接抛出异常。
注意:
librdkafka不像java客户端那样,可以通过future.get()实现同步发送。所以,如果broker不能连通的话,send方法还是可以正常将消息放入队列。这会导致两个问题
1、我们的客户端是不会知道broker已经挂掉了,因而不能对这种情况作出及时处理,导致消息全部堆积在内存中,如果此时不幸,我们的客户端也挂掉了,那这部分消息就全部丢失了。
2、如果broker一直没有恢复,而我们一直向队列中写数据的话,producer中有一个选项message.timeout.ms,如果超过了设定的消息超时时间,那么会有线程清理队列中的数据,导致消息丢失,而如果将时间设置为0(永不超时)的话,将导致客户端内存撑满。
此问题可以通过设置发送回调的处理方式来应对。
生产者重要配置
- acks指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
acks=0 , 生产者不等待服务器接收响应,吞吐量高。
acks=1 ,需要等待首领确认,可能产生数据丢失(首领切换时),吞吐量取决于同步还是异步发送。
acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自
服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算
有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1时更高,因为要等待不只一个服务器节点接收消息。
- buffer.memory
内存缓冲区的大小,生产者发送数据速度超过服务器接收时,这里会被占满,send要么阻塞要么抛出异常,取决于max.block.ms的设定。
- compression.type
发送消息压缩类型,有snappy 、 gzi.p 或 lz4四种。
- retries
错误重试次数,默认100ms一次,可以通过retry.backoff.ms改变,需判断是临时错误还是重要错误(如发送消息过大,重试无意义)来进行处理。
序列化器
Kafka有自己的散列算法(更新java版本算法不会改变),在分区不变的时候,键值相同一定会被分到相同分区。所以在创建主题时规划好分区,不要轻易改变。
可以自定义实现分区策略,java语言需自己实现接口
消费者
读取消息。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。 偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时, Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题 。 群组保证每个分区只能被一个消费者使用 。下图所示的群组中,有 3 个消费者同时读取一个主题。其中的两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。消费者与分区之间的映射通常被称为消费者对分区的所有权关系 。
一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。如果消费者组数量超过partition数量,则会有消费者一直收不到消息,直到有正在消费的消费者停止消费。
消费者分区分配策略
而确定消费者消费那几个分区的策略有如下几种
- RangeStrategy
- 对分区编号进行排序(如0,1,2,3,4,5,6,7,8,9)
- 消费者线程排序(C1-0, C2-0, C2-1)
- 将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
如下:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C2-1 将消费 8, 9, 10 分区
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。
- RoundRobin strategy
使用此策略的前提条件:
同一个Consumer Group里面的所有消费者的num.streams必须相等;
每个消费者订阅的主题必须相同
将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序
分区分配策略,librdkafka如何自定义暂未发现方法,java应该是可以的。
消费者群组和分区再均衡
一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。这种所有权转移,叫做再均衡(rebalance),再均衡可以提供高可用性和伸缩性,但是消费者可能暂时无法读取消息,造成整个群组一小段时间的不可用。
消费者通过向被指派为群组协调器(group coordinater)的 broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。另外,消费者退出,群组协调器不会等待心跳而立即执行再均衡。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。
心跳行为
Kafka 社区引入了一个独立的心跳线程,可以在轮均消息的空档发送心跳 。 这样一来,发送心跳的频率(也就是消费者群组用于检测发生崩溃的消费者或不再发送心跳消费者的时间)与消息轮询的频率(由处理消息所花费的时间未确定)之间就是相互独立的。
订阅主题
我们也可以在订阅主题时传入一个正则表达式。正则表达式可以匹配多个主题, 如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,井且可以处理不同类型的数据,那么这种订阅方式就很管用。在 Kafka 和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题是很常见的做法。
一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
poll () 方能返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。poll()方法有一个超时参数,它指定了在多久之后可以返回,不管有没有可用的数据都要返回。 超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。
注意:
Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多pool多少个record。
那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。
在consumer中,还有另外一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起poll请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法了。
因此fetch到消息后,offset提交的是fetch到消息的最大offset而不是poll得到的消息位置,此时如果消费者挂掉了,offset又自动提交了,那么fetch到但是没有消费的消息就是丢失,解决办法是每poll成功一次消息就手动提交一次offset。
消费者的配置
- fetch.min.bytes
指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。
- fetch.max.wait.ms
默认500ms,即如果一直没有到最小字节数,到500ms就发送数据。大部分客户端只能读取已经被写入所有同步副本的消息(跟随者副本也不行,尽管它们也是消费者 否则复制功能就无法工作)。分区首领知道每个消息会被复制到哪个副本上,在消息还没有被写入所有同步副本之前,是不会发送给消费者的一一尝试获取这些消息的请求会得到空的响应而不是错误 。
- max . partition.fetch. bytes
消费者从分区获取最大字节数,默认1M。注意,值必须比broker能够接收的最大消息的字节数(通过max.message.size属性配置)大,否则消费者一直挂起重试。
- session.timeout.ms
会话超时时间,默认3s,如果在时间内没有发送心跳,则认为死亡而触发rebalance。所以要注意协调heartbeat.interval.ms的设置。
- auto.offset.reset
指定消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
earlist:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- enable.auto.commit
自动提交偏移量,避免重复消费应设为false,如果为true,需要设定auto.commit.interval.ms指定自动提交的间隔。另外发现,topic设置有一个auto.commit.enable,优先度比全局设置低,所以不要设置错了。
- partition.assignment.strategy
分区分配策略
提交和偏移量
如果消费者一直运行,提交到_consumer_offset并没有什么用,这个offset是为了在消费者发生崩溃或有新的消费者加入群组出发再均衡时使用的。
提交方式
- 自动提交
默认每隔5s自动提交一次。容易出现重复消费,通过减小提交时间降低,但是无法避免。每次轮询会提交上次的offset,而不知道上一次的消息是否被处理,所以最好保证上一次的消息被处理之后再提交。
- 主动提交(同步提交,异步提交)
暂未发现librdkafka如何提交,需要阅读源码查找确认。一般采取同步异步组合提交结果比较准确。注意,提交的是最后一个offset,也不是当前处理的消息的offset。
- 提交特定位置偏移量
Librdkafka应该有一个函数,可以手动传入一个表,参数为topic,partition,offset,用来指定下一个从哪里读取。
从特定位置开始处理消息记录
Kafka支持消费者从特定位置处理消息记录。消费端经调查有rd_kafka_offsets_store,应该可以手动提交,但是未验证,但是可以通过kafkaserver的指令来指定。
broker和集群
一个独立的 Kafka 服务器被称为 broker。 broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控broker. 在集群中, 一个分区从属于一个 broker, 该 broker 被称为分区的首领 。一个分区可以分配给多个broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
Kafka配置
Zookeeper 集群被称为群组。 Zookeeper 使用的是一致性协议,所以建议每个群组里应该包含奇数个节点(比如 3 个、 5 个等),这样保证一小半不工作了还能处理外部请求。
群组需要有一些公共配置,每个服务器还要在数据目录中创建一个 myid 文件,用 于指明自己的 ID。如果群组里服务器的机器名是 zoo1.example.com 、 zoo2.example.com 、 zoo3 .example.com。initlimit 表示用于在从节点与主节点之间建立初始化连接的时间上限,synclimit 表示允许从节点与主节点处于不同步状态的时间上限。这两个值都是 tickTime 的倍数。服务器地址遵循 server.X=hostname:peerPort:leaderPort 格式,各个参数说明如下:
X
服务器的ID,它必须是一个整数,不过不一定要从 0 开始,也不要求是连续的
hostname
服务器的机器名或 IP 地址;
peerPort
用于节点间通信的 TCP 端口;
leaderPort
用于首领选举的 TCP 端口。
客户端只需要通过 clientPort 就能连接到群组,而群组节点间的通信则需要同时用到这 3 个端口( peerPort 、 leaderPort 、 clientPort )
KafkaServer参数配置
num.paritions指定了新创建的主题将包含多少个分区。一般根据生产者的吞吐量以及消费者的消费速度来决定分区数量,例如生产者1G/s对应消费者50M/s * 20partition,如果信息不足,可以将分区设置为25G。
log.retention.hours指定消息保留时间,单位为毫秒,如果同时设置了minutes和ms,这三个效果是一样的,系统会优先选择最小的时间,建议ms。
log.retention.bytes通过保留的消息字节数来判断消息是否过期,作用在每一个分区上。
如果上述两项同时设置,则只要任意一个条件得到满足,消息就会被删除。
log.segment.bytes决定日志文件片段大小,达到指定大小之后,这块文件片段会关闭,注意,正在写入的片段是活跃片段,活跃片段永远不会被删除。只有片段关闭后才开始等待过期,即一个日志设置了7天过期,而这个片段花了10天才填满,那么从片段产生到过期其实过了17天。而且不管片段是否活跃,broker都会为每个分区片段打开一个文件句柄,需要注意调优,避免打开太多文件句柄。
log.segment.ms 与日志留存时间一样,大小和时间任意条件达到设定大小,消息日志就会过期,此设定默认情况下是没有配置的。
磁盘容量:如果每天接收1T数据保留7天,则需要70T容量
Broker需要的数量:如果需要保留10T数据,则需要 (5 * 2T/Broker)个broker
常用配置Kafka指令
创建Topic
./kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <int> --partitions <int>
调整分区
./kafka-topics.sh --zookeeper <zookeeper connect> --alter --topic <string> --partitions <int>
删除分区
./kafka-topics.sh --zookeeper <zookeeper connect> --delete --topic <string>
列出集群里所有主题
./kafka-topics.sh --zookeeper <zookeeper connect> --list
列出Topic详细信息
./kafka-topics.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions
其中--under-replicated-partitions可以列出所有包含不同步副本的分区
使用--unavailable-partitions参数可以列出所有没有首领的分区,它们处于离线状态,对生产者和消费者是不可用的。
列出并描述消费者群组
旧版本:./kafka-consumer-groups.sh --zookeeper <zookeeper connect> --list <string
新版本:./kafka-consumer-groups.sh --new-consumer --bootstrap-server <zookeeper connect> --list <string>
例子:./kafka-consumer-groups.sh --describe --verbose --group 0 --bootstrap-server 10.117.210.183:9092
将group内所有offset指向1000
./kafka-consumer-groups --bootstrap-server localhost:9092 --group 0 --topic TestWSL666 --reset-offsets --to-offset 1000 –execute
使用--describe代替--list就可以获取该群组的所有主题的信息和每个分区的偏移量。
删除群组
./kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group <string>
注意在执行前必须关闭所有的消费者。该命令也可以用来在不删除整个群组的情况下删除单个主题的偏移量。
./kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group <string> --topic <string>
导出偏移量
./kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect <zookeeper connect> --group <string> --output-file <string>
导入偏移量
./kafka-run-class.sh kafka.tools.ImportZkOffset -- zkconnect <zookeeper connect> --input-file offsets
注意在导入偏移量之前,必须先关闭所有的消费者。如果消费者群组处于活跃状
态,它们不会读取新的偏移量 , 反而有可能将导人的偏移量覆盖掉。
覆盖主题的默认配置
./kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name <topic name> --add-config <key>=<value>[,<key>=<value>,…]
常用配置参数如下:
配置broker集群
要把一个 broker 加入到集群里,只需要修改两个配置参数。首先,所有 broker 都必须配
置相同的 zookeeper .connect , 该参数指定了用于保存元数据的 Zookeeper 群组和路 径。
其次,每个 broker 都必须为 broker.id 参数设置唯一的值。 如果两个 broker 使用相同的
Broker.id,那么第二个broker就无陆启动。
Kafka 对 Zookeeper 的延迟和超时比较敏感,与Zookeeper群组之间的一个通信异常就可能导致Kafka服务器出现无法预测的行为 。
其他
rdkafka常用接口说明
一、主要数据结构
1、typedef struct rd_kafka_conf_s rd_kafka_conf_t;
rd_kafka_conf_t是kafka的全局配置结构,通过rd_kafka_conf_new()创建,创建时即进行了默认配置,通过rd_kafka_conf_set()设置参数值,是rd_kafka_new()创建kafka处理句柄的第二个参数,是必须创建的结构。
2、typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
rd_kafka_topic_conf_t主题配置结构,通过rd_kafka_topic_conf_new()创建,创建时即进行了默认配置,通过rd_kafka_topic_conf_set()设置参数值,是必须创建的结构。
3、typedef struct rd_kafka_s rd_kafka_t;
rd_kafka_t 是kafka 处理句柄结构,分为producer类型和consumer类型,由rd_kafka_new()第一个参数类型决定。rd_kafka_t是一个总体结构,conf和topic_conf都是为此结构服务,其中包含rk_brokers链表,rk_topics链表,是必须创建的结构。
4、typedef struct rd_kafka_topic_partition_list_s rd_kafka_topic_partition_list_t;
rd_kafka_topic_partition_list_t 可扩展长度的 主题-分区 链表,通过rd_kafka_topic_partition_list_new()创建,创建时指定长度,通过rd_kafka_topic_partition_list_add()添加 主题-分区对,用于订阅消息。
二、主要接口
1、rd_kafka_conf_t *rd_kafka_conf_new (void)
参数:无
返回值:rd_kafka_conf_t *
创建一个kafka全局配置结构,并进行默认初始化设置,返回其引用指针。
2、rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
const char *name,
const char *value,
char *errstr, size_t errstr_size)
参数:
conf:配置结构
name:配置项名称
value:配置项值
errstr:错误提示
errstr_size:错误提示长度
返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中
name具体的名称及作用见rd_kafka_properties 中_RK_GLOBAL类型的数据定义。调用这个函数后再调用rd_kafka_conf_set_default_topic_conf()会将之前设置的值全部用默认值覆盖掉。因为错误提示会写入errstr中,所以提前给errstr分配512字节空间。
3、rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)
参数:无
返回值:rd_kafka_topic_conf_t *
创建一个主题配置结构,并进行默认初始化设置,返回其引用指针。
4、rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
const char *name,
const char *value,
char *errstr, size_t errstr_size)
参数:
Conf:主题配置结构
Name:主题配置项名称
Value:主题配置项值
Errstr:错误提示
errstr_size:错误提示长度
返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中
name具体的名称及作用见rd_kafka_properties 中_RK_TOPIC类型的数据定义。因为错误提示会写入errstr中,所以提前给errstr分配512字节空间。
5、rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
char *errstr, size_t errstr_size)
参数:
Type:RD_KAFKA_PRODUCER是创建生产者类型,RD_KAFKA_CONSUMER是创建消费者类型
Conf:配置结构
Errstr:错误提示
errstr_size:错误提示长度
返回值:
成功:返回rd_kafka_t *kafka操作句柄
失败:返回NULL,并记录错误信息到errstr
程序中先配置conf和topic_conf,然后调用此接口生成操作句柄。对消费者来讲,订阅主题,轮询接收消息。对生产者来讲,根据主题生成主题操作句柄,并通过主题操作句柄发送消息。
6、void rd_kafka_destroy (rd_kafka_t *rk)
参数:
Rk:kafka操作句柄
返回值:无
释放创建的kafka操作句柄。
6、rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf)
参数:
Rk:kafka操作句柄
Topic:主题内容
Conf:主题配置
返回值:
成功:返回rd_kafka_topic_t * 主题操作句柄
失败:返回NULL,记录错误信息到errstr
此接口一般是生产者使用,使用此接口生成的主题操作句柄进行发送消息。
7、void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)
参数:
app_rkt:主题操作句柄
返回值:无
释放创建的主题操作句柄
8、int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque)
参数:
Rkt:主题操作句柄
Partition:分区号
Msgflags:消息标志,使用RD_KAFKA_MSG_F_COPY标志
Payload:消息体指针
Len:消息体长度
Key:消息选项key值,用作平衡分区,计算分区号的,填NULL
Keylen:key长度,填0
msg_opaque:是作为回调函数的参数,填NULL
10、int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)
参数:
Rk:kafka操作句柄
timeout_ms:毫秒级时间
返回值:处理的事件数
发送完消息后调用此接口,timeout_ms是毫秒级的时间,函数会阻塞timeout_ms 毫秒等待事件处理,调用设置的回调函数。timeout_ms为0是非阻塞状态。
11、rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)
参数:
Rk:kafka操作句柄
timeout_ms:毫秒级时间
返回值:
成功:RD_KAFKA_RESP_ERR_NO_ERROR
失败:RD_KAFKA_RESP_ERR__TIMED_OUT
在摧毁生产者之前调用此接口,确保正在排队和正在进行的消息被处理完成。此函数会调用rd_kafka_poll()并触发回调。
12、int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
参数:
Rk:kafka操作句柄
Brokerlist:broker字符串 如:”172.20.51.38:9092” 不写端口,则采用默认端口9092
多个broker brokerlist = "broker1:10000,broker2"
返回值:成功添加的broker个数
添加一个broker也可以通过 设置rd_kafka_conf_t结构中的 "bootstrap.servers" 配置项
rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr))
13、rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk)
参数:
Rk:kafka操作句柄
返回值:rd_kafka_resp_err_t 枚举
将消息重定向到了消费者队列,可以使用rd_kafka_consumer_poll()进行取消息。
14、rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size)
参数:size是topic结构的个数
返回值:无
原文:https://blog.****.net/icando777/article/details/78904487
Kafka全部数据清空
kafka全部数据清空的步骤为:
- 停止每台机器上的kafka;
- 删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录;
- 删除zookeeper上与kafka相关的znode节点;
- 重启kafka、如果删除topic还在则需要重启zookeeper;
这里以192.168.187.201 node1、192.168.187.202 node2、192.168.187.203 node3三台机器作为kafka的集群。
注意:kafka版本为kafka_2.11-1.1.1
1.1 停止每台机器上的kafka
以root用户分别登录三台机器,使用命令jps 找出kafka的PID,再使用命令 kill kafka进程。
节点node1
节点node2
节点node3
1.2 删除kafka存储目录
在kafka安装目录的config文件夹下server.properties中查看存储目录为:
删除该目录所有数据:
1.3 删除zookeeper上与kafka相关的znode节点
zookeeper上面保存着kafka的所有topic及其消费信息,故需要删除与kafka相关的znode节点:
进入zookeeper的shell界面:
查看与kafka相关的znode节点:
在上面的znode节点中,除了zookeeper作为zk的安全保障措施,其他znode节点都得删除
1.4 重启kafka
分别在node1、node2、node3上面执行如下命令启动kafka:
/opt/app/kafka_2.11-1.1.1/bin/kafka-server-start.sh /opt/app/kafka_2.11-1.1.1/config/server.properties > /dev/null 2>&1 &
jps命令查看node1、node2、node3上面的启动情况:
最后在查看kafka上面是否还有topic存在:
可以看到topic及其相关数据已被清空删除
查看当前所有topic
比如目前需要删除test这一topic,目前kafka_2.11-1.1.1以上版本默认delete.topic.enable=true,即是说使用命令
./kafka-topics.sh --zookeeper node1:2181 --delete --topic test
该命令将会在zookeeper中删除与test这一topic相关的znode节点(包括test详细信息、生产数据、消费数据的节点),并在kafka的存储目录/opt/data/kafka/kafka-logs/下把与test这一topic相关的存储数据目录标记为待删除,稍后会真正删除这些待删除的目录,如下:
使用kafka-topics.sh查看test在zookeeper中相关znode节点信息是否已被删除
在/opt/data/kafka/kafka-logs目录下查看test相关存储目录是否被标记删除
在/opt/data/kafka/kafka-logs目录下查看test相关存储目录已被删除
3. 思考
kafka全部数据清空步骤比较繁琐,借鉴某一topic数据清空的方式,可以通过使用kafka-topics.sh --delete命令逐个删除所有的topic,达到清空kafka全部topic数据的目的,不足的是topic“__consumer_offsets”无法删除,不过不碍事。
librdkafka常用配置参数