《kafka中文手册》- 构架设计(二)
4.6 Message Delivery Semantics 消息分发语义
Now that we understand a little about how producers and consumers work, let’s discuss the semantic guarantees Kafka provides between producer and consumer. Clearly there are multiple possible message delivery guarantees that could be provided:
现在我们大致理解生产者和消息者是怎么工作的, 现在我们讨论下kafka提供的基于生产者和消费者之间提供的保障语言. 很清楚地, 这里有三种消息的发送保障机制
- At most once—Messages may be lost but are never redelivered. 最多一次, 消息可能会丢失, 但是不会被重复分发
- At least once—Messages are never lost but may be redelivered. 至少一次, 消息不会丢失, 但有可能会重复分发
- Exactly once—this is what people actually want, each message is delivered once and only once. 有且仅有一次, 这是人们最终想要的, 消息仅且只会被分发一次
Many systems claim to provide “exactly once” delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don’t translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost).
很多系统声称他们能够提供仅此一次的分发语义, 但是仔细读的话, 所有的这些声明都是误导(他们没有考虑到消费者或提供者可能会失败的情况, 或是多个消费者的情况, 或是数据写入磁盘时丢失的情况)
Kafka’s semantics are straight-forward. When publishing a message we have a notion of the message being “committed” to the log. Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains “alive”. The definition of alive as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let’s assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key
.kafka提供的语义十分直接. 在发布消息的时, 我们有一个消息正在被提及到日志的概念, 一旦消费被提交上, 就不会丢失, kafka服务器把对这个分区上的消息复制到其他活着的服务器上.关于存活的定义和失败的定义我们将会在下面讲到, 现在, 我们假设有一个很好的, 不会丢失的服务器, 尝试对消费者和生产者提供保障. 如果生产者尝试发布消息碰到网络异常, 它无法确认这个错误是在消息提及之前还是在消息提及之后. 这个类似于使用插入数据到数据库时使用自动增长的主键的情形.
These are not the strongest possible semantics for publishers. Although we cannot be sure of what happened in the case of a network error, it is possible to allow the producer to generate a sort of “primary key” that makes retrying the produce request idempotent. This feature is not trivial for a replicated system because of course it must work even (or especially) in the case of a server failure. With this feature it would suffice for the producer to retry until it receives acknowledgement of a successfully committed message at which point we would guarantee the message had been published exactly once. We hope to add this in a future Kafka version.
这里没有对生产者使用强制可能的语境. 因为, 我们无法确定网络是否会发生异常, 有可能让生产者创建有序的主键, 使得提供者在重试请求是对等的. 这个特性对一个复制系统非常重要, 它甚至要保证服务器宕机时也能工作, 使用这个特性允许生产者重试, 直到接收到消息已经成功提交的反馈信息, 在这个点上可以保证消息指北发布一次. 我们打算把这个特性发布到未来的kafka版本中
Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not necessarily the followers) have the message.
并不是所有的情况都需要这样强的保障的. 对于那些对延迟比较敏感的生产者, 我们允许生产者自定义可用性级别. 比如生产者愿意等候消息10ms后再被提交. 然而, 生产者也可以配置完全使用异步发送, 或者等候到主服务器(而不是所有的副本)已经拥有这份消息.
Now let’s describe the semantics from the point-of-view of the consumer. All replicas have the exact same log with the same offsets. The consumer controls its position in this log. If the consumer never crashed it could just store this position in memory, but if the consumer fails and we want this topic partition to be taken over by another process the new process will need to choose an appropriate position from which to start processing. Let’s say the consumer reads some messages — it has several options for processing the messages and updating its position.
现在, 让我来查看下消费者在这方面对应的语义. 所有的副本都有同样的日志和位移, 消费者控制它在日志中的唯一位置, 如果消费者没有奔溃, 它只需要简单地把位置储存到内存中, 但是如果消费者失败了, 我们希望这个主题的分区能够被其它新的消费者进程使用, 并能从原先的合适的位置开始读取. 我们说消费者需要读取一些信息–它有一些选项用于处理消息和更新它的位置.
- It can read the messages, then save its position in the log, and finally process the messages. In this case there is a possibility that the consumer process crashes after saving its position but before saving the output of its message processing. In this case the process that took over processing would start at the saved position even though a few messages prior to that position had not been processed. This corresponds to “at-most-once” semantics as in the case of a consumer failure messages may not be processed.
- 它可以先读取消息, 然后把位置保存到日志中, 然后处理消息, 在这种情况下, 消费者有可能在保存了日志点后, 在处理消息输出数据是奔溃, 这时进程可能从日志点读取位移位置接下去处理数据, 尽管之前有一些数据处理失败了. 这种对于在消费者失败时最多被处理一次的语义
- It can read the messages, process the messages, and finally save its position. In this case there is a possibility that the consumer process crashes after processing messages but before saving its position. In this case when the new process takes over the first few messages it receives will already have been processed. This corresponds to the “at-least-once” semantics in the case of consumer failure. In many cases messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself).
- 它可以先读取, 然后处理消息, 最后保存位移. 在这种情况下, 消费者有可能在保存位移时奔溃, 新进程重启是可能就会接收到一些之前处理过的数据, 这对应于至少被消费一次的语义, 很多情况下, 消息可能有自己的主键, 所以在更新上是等效的(收到一份相同的消息两次, 只不过对同一份记录覆盖两次)
- So what about exactly once semantics (i.e. the thing you actually want)? The limitation here is not actually a feature of the messaging system but rather the need to co-ordinate the consumer’s position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase commit between the storage for the consumer position and the storage of the consumers output. But this can be handled more simply and generally by simply letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, our Hadoop ETL that populates data in HDFS stores its offsets in HDFS with the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication.
- 那什么是仅此一次的语义(这个是不是你真的想要的), 这个限制其实并不是消息系统的特性, 而是要协调消费者的位置和它实际输出的储存方式. 经典的解决方式是在数据储存和消费者位移储存间引入来两次提交的方式, 但是可以使用更简单的方式把消费者的位移位置和数据输出保存到同一个位置上, 因为有很多储存系统并不支持两相提交. 例如, 我们的hadoop ETL工具从保存数据到dhfs上的同时也把位移位置也保存到hdfs中了, 这样可以保证数据和位移位置同时被更新或者都没更新.我们在很多系统上使用类似的模式, 用于解决那些需要这种强语义但是却没有主键用于区分重复的储存系统中.
- So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
- 因此kafka默认保障最少一次的分发语义, 并允许用户禁止重试和在处理数据之前提及它的位移位置来实现最多一次的语义, 有且仅有一次, 这种语义需要和输出的目的储存系统相结合, 但是kafka提供的位移语义使得实现这些功能非常简单.
4.7 Replication 复制
Kafka replicates the log for each topic’s partitions across a configurable number of servers (you can set this replication factor on a topic-by-topic basis). This allows automatic failover to these replicas when a server in the cluster fails so messages remain available in the presence of failures.
kafka为每个主题的分区日志复制到一个可配置的数据的服务器集群上(你可以对每个主题设置副本数). 这保证了如果集群中有服务器宕机时能够自动恢复, 消息可以从剩余的服务器中读取.
Other messaging systems provide some replication-related features, but, in our (totally biased) opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides: slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka is meant to be used with replication by default—in fact we implement un-replicated topics as replicated topics where the replication factor is one.
其他消息系统也提供复制特性, 但是, 在我们(有点偏见地)看来, 这视乎是一个附加的特性, 不能大量使用, 并且伴随大量的缺点, 备机不是活跃的, 吞吐量严重受到影响. 还需要繁琐的人工配置等等. kafka默认开启复制功能, 实际上我们把没有实现复制的主题当作副本只有一个的复制主题来看待.
The unit of replication is the topic partition. Under non-failure conditions, each partition in Kafka has a single leader and zero or more followers. The total number of replicas including the leader constitute the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader’s log—all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).
复制是基于主题分区. 在没有失败的情况下, 每个分区在kafka中有一个主分区和零个或多个备份分区, 所有的这些副本包括主分区构成了复制因子.所有读写都使用主分区. 正常情况下, 分区数量一般比服务器多的多, 所有的主分区最终分布到所有的服务器上. 在备份服务器上的日志一般和主服务器的日志一致, 拥有相同的偏移量和消息顺序(当然, 在特定的时间内, 主分区日志的尾部可能有一些消息没有复制到主服务器上)
Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.
备份服务器获从主服务器获取消息就像kafka的消费着读取并记录到自己的日志中. 这些从服务器有个很好的特性, 就是能自然地获取批量数据并应用到他们自己的日志中.
As with most distributed systems automatically handling failures requires having a precise definition of what it means for a node to be “alive”. For Kafka node liveness has two conditions
和大部分分布式系统一样, 自动处理容错需要对节点”存活”有一个准确的定义, 比如kafka节点存活有两个条件
- A node must be able to maintain its session with ZooKeeper (via ZooKeeper’s heartbeat mechanism) 节点必须能够和zookeeper机器建立心跳信号
- If it is a slave it must replicate the writes happening on the leader and not fall “too far” behind 如果是个备份节点, 必须在主节点写的时候进行复制, 不能落下太远.
In distributed systems terminology we only attempt to handle a “fail/recover” model of failures where nodes suddenly cease working and then later recover (perhaps without knowing that they have died). Kafka does not handle so-called “Byzantine” failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play).
在一个分布式的术语里, 我们尝试处理”失败/恢复”模型, 像节点突然停止工作, 然后又恢复的(可能不知道他们是否宕机了). kafka不处理所谓的“拜占庭”故障,比如节点产生任意或恶意的反馈(比如bug或不规范行为)
A message is considered “committed” when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses.
一条消息只有在它在所有的同步副本集的日志分区都已经提交了, 才被当作是”已提交”. 只有已经提交的消息才会分发给消费者. 这说明消费者不需要担心会看到主节点宕机时消息会丢失. 生产者可以在延迟和持久性中, 决定是否等待消息提交. 这个在生产者中的反馈配置项中可以设置.
The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.
kafka保证已经提交的数据不会丢失, 同步的复制集中有一个节点是一直都是存活的.
Kafka will remain available in the presence of node failures after a short fail-over period, but may not remain available in the presence of network partitions.
kafka可以保证节点在一个短暂的宕机时, 仍然可用. 但是无法保证网络出现脑裂时仍然可用.
Replicated Logs: Quorums, ISRs, and State Machines (Oh my!) 复制日志
A replicated log models the process of coming into consensus on the order of a series of values (generally numbering the log entries 0, 1, 2, …). There are many ways to implement this, but the simplest and fastest is with a leader who chooses the ordering of values provided to it. As long as the leader remains alive, all followers need to only copy the values and ordering the leader chooses.
复制日志模型用于处理连续输入的, 有序的记录值(像有编号的日志1, 2, 3). 这里有很多实现的方式, 但是最简单和最有效的方式是, 主节点选择和提供这个顺序值. 只要主节点存活, 备份节点只要按主几点选择的顺序拷贝这些值就可以了.
Of course if leaders didn’t fail we wouldn’t need followers! When the leader does die we need to choose a new leader from among the followers. But followers themselves may fall behind or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
当然如果主节点不宕机, 我们也不需要备份节点! 如果主节点宕机了, 我们需要从备份节点中选择一个新的主节点. 但是备份节点本身也有可能宕机或者延迟, 所以我们必须选择最新的备份节点. 最基本的保证是, 一个复制算法必须提供,”如果我们告诉客户端消息已经提交了, 这个时候主节点宕机, 新的主节点被选举出来时必须保证也有拥有这条消息”, 这里有一个权衡, 主节点必须等待多个从节点反馈消息已经提交, 这样才能有更多备节点能用来做为主节点的候选节点.
If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
如果你选择那些需要反馈的数量和可以用于选举为主节点的日志数可以保证重叠, 这个叫做 Quorum
A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let’s explore it anyway to understand the tradeoffs. Let’s say we have 2f+1 replicas. If f+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least f+1 replicas, then, with no more than f failures, the leader is guaranteed to have all committed messages. This is because among any f+1 replicas, there must be at least one replica that contains all committed messages. That replica’s log will be the most complete and therefore will be selected as the new leader. There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we will ignore these for now.
一种达到这种目标的最常用的方法是, 在提交决策和选举决策上都使用最多投票的方式, 这不是kafka的实现, 但是我们为了明白这个原理还是解释下.如果说我们有2f+1个副本, 那么f+1的副本必须在主节点提交日志前接受到消息, 这样我们就可以从拥有最完全的日志的f+1个副本集中选择出主服务器. 因为在任何f+1个副本中, 肯定有一个副本是包含全部的日志的, 这个副本的日志是最新的, 因此会被选择为主节点. 这里有很多关于这个算法的细节需要处理(像如果定义使日志更全些, 再主节点宕机时保证日志的一致性, 修改复制集中日志的副本数 ), 但是我们现在先忽略
This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.
There are a rich variety of algorithms in this family including ZooKeeper’s Zab, Raft, and Viewstamped Replication. The most similar academic publication we are aware of to Kafka’s actual implementation is PacificA from Microsoft.
主选举机制有一个相当好的属性, 延迟只依赖于最快的服务器. 因此, 如果复制集是3, 延迟取决于最快的备份节点而不是最慢的那个.
这里还有很多类似的算法变体, 例如ZooKeeper’s Zab, Raft, and Viewstamped Replication. 我们关注的最相似的学术刊物是kafka的实际实现
The downside of majority vote is that it doesn’t take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as ZooKeeper but are less common for primary data storage. For example in HDFS the namenode’s high-availability feature is built on a majority-vote-based journal, but this more expensive approach is not used for the data itself.
多数投票的缺点是,它并不容忍很多的失败, 导致你没有可被选择为主的备节点.为了容忍1个错误需要3份的副本数据, 要容忍3个失败需要5份副本数据. 在我们的经验中, 用足够多的冗余来来容忍单一的错误在现实中的系统是不够的, 这样每次写5次, 使用5被的硬盘空间和5份之一的带宽, 在大体量的数据储存上不是特别实践, 所以quorum的算法机制在共享的集群配置中好像更为常见写, 但是在主储存结构上比较少用, 例如, hdfs的namenode节点使用基于主副本的选举机制建立高可用性能, 但是由于代价太高没有用在数据方面
Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. This is an important factor for Kafka’s usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.
kafka使用有点儿不太一样的策略来选择他的quorum集合. 不像多数投票一样, kafka动态维护能跟得上主节点的复制集合(ISR), 只有在这个集合里面的成员才有资格被选举为主节点, 这个对于kafka这种拥有很多分区并且需要保证主节点的负载均衡的模型来说非常重要. 使用ISR这样的模型和f+1个副本, kafka的主题可以容忍f个节点宕机后已经提交的消息不会丢失.
For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice, to tolerate f failures, both the majority vote and the ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach. However, we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.
在大部分情况下我们希望能处理的, 认为这样的权衡是合理的. 在实践中, 为了对f几个节点宕机进行容错, 无论是多数投票还是ISR策略都需要等待一样数量的副本都得到通知后才能提交消息(为了避免一个节点宕机, 多数投票策略需要3个副本和一次反馈, isr策略需要2个副本和一次反馈). 不需要等待最慢的服务器就能提交消息是多数投票策略最大的优点.尽管这样, 我们进行改善,让客户端选择是否等待消息提交, 使用较小的副本因子会带来额外的吞吐量带来的价值可能不菲.
Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of “stable storage” that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.
一个重要的设计是kafka并没有要求宕机的节点需要完整无缺恢复数据. 要求复制算法在故障恢复是没有任何一致性冲突依赖于底层的储存介质. 这个假设有两个主要的问题, 第一磁盘错误是最通常的问题, 我们从一个现实的数据储存系统的操作可以观察到, 这个可能导致数据不完整. 第二, 即使这些没有问题, 我们也不想每次写数据是调用fsync方法来保证数据的一致性, 因为这会降低2到3倍的性能. 我们允许一个复制节点从新加入到ISR集合前, 必须完全同步上, 即使它宕机是把未flush的数据丢失了
Unclean leader election: What if they all die? 不清楚主选举:如果全部宕机了?
However a practical system needs to do something reasonable when all the replicas die. If you are unlucky enough to have this occur, it is important to consider what will happen. There are two behaviors that could be implemented:
尽管这样, 在现实中的系统, 需要做一些事情在所有副本都宕机的情况下. 如果你不幸遇到了, 你需要仔细考虑下将碰到的问题. 有两种行为需要去做:
- Wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data). 等待ISR中一个副本起来, 然后选择这个副本作为主节点(期望数据不会丢失)
- Choose the first replica (not necessarily in the ISR) that comes back to life as the leader.选择第一个存活的副本(不一定在ISR副本 集中)直接作为主节点
This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. By default, Kafka chooses the second strategy and favor choosing a potentially inconsistent replica when all replicas in the ISR are dead. This behavior can be disabled using configuration property unclean.leader.election.enable, to support use cases where downtime is preferable to inconsistency.
这个必须在可用性和一致性之间作权衡. 如果我们等待在ISR集合中的副本再次启动起来, 那么在所有副本及都宕机这段时间, 我们会维持不可用的状态.如果这些副本已经坏了, 或对应的数据已经丢失了, 则我们永久宕机了.如果, 换种方式, 从没有同步的副本中选择一个存活的变成主的, 那么这个副本的日志就变成当前主要的数据源, 但是保证当前所有已经提交的消息还存在. 默认情况下, kafka使用第二种策略, 在ISR中的所有副本集都宕机时, 使用一个潜在的非一致性的副本,如果我们更期望是不可用状态而不是不一致状态时, 这个特性可以通过配置unclean.leader.election.enable来禁用,
This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example in a majority voting scheme, if a majority of servers suffer a permanent failure, then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.
这种困境不是kafka特有的, 这存在于任何基于quorum方式的结构中. 例如, 多数投票算法, 如果大多数的服务器都永久性失效了, 你必须选择丢失全部的数据或者接受某一台可能数据不一致的服务器上的数据.
Availability and Durability Guarantees 可用性和可靠性保证
- Disable unclean leader election – if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
- 禁止无主选举, 如果所有的副本都不可用, 这个分区就要等到最近一个主分区可以用时才可用, 这比较可能导致不可用, 而不是数据丢失.
- Specify a minimum ISR size – the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses acks=all and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
- 指定一个最小的ISR集合, 分区只有在ISR集合的个数大于指定值时, 才能进行读写, 这样可以阻止消息只写入到一个副本的, 随后这个副本宕机导致数据丢失.这个只有参数生产者开启了全反馈的时才能保证消息会在所有同步的副本集中至少有这么多个反馈. 这个参数提供了一致性和可用性之前的权衡, 较大最小ISR可以保证比较好的一致性, 因为消息被写入更多的副本, 减少丢失的可能性, 但是同时也减低了可用性, 因为分区的副本数如果达不到最小ISR集合时将不可用.
Replica Management 复制管理
It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead, we elect one of the brokers as the “controller”. This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.
对不可用的时间端, 优化主节点的选举也很重要. 一个直观的选举实现是如果一个节点宕机了, 那么这个节点上的每个分区都独立选举. 但是, 我们选举一个节点作为控制器. 这个控制或在节点级别上进行容错管理, 和负责修改所有的被影响的分区的选举. 这样好处是,我们可以批量处理选举, 减少很多独立选举时大量通知, 这使得在大量分区时选举代价更小, 更快. 如果这个控制器失败了, 其中一个还存活的节点会变成主控制器.
4.8 Log Compaction 日志压缩
So far we have described only the simpler approach to data retention where old log data is discarded after a fixed period of time or when the log reaches some predetermined size. This works well for temporal event data such as logging where each record stands alone. However an important class of data streams are the log of changes to keyed, mutable data (for example, the changes to a database table).
到目前为止, 我们只讨论了简单的日志维护方式, 如定期清理过期的数据, 或者清除超过指定大小的数据. 这种方式对于每条数据记录临时日志信息非常有效. 但是有一种重要的数据流类型, 是根据主键, 变化的数据(例如: 数据库表的变化)
Let’s discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted):
让我们使用具体实例来讨论这个问题. 比如说我们有个关于用户邮件的主题, 每次有用户的邮件变化时, 我们发送一条消息到这个主题, 这个消息使用用户的ID作为主键. 现在我们过段时间发送一些消息给用户(id:123), 每条消息包含着邮件变更信息(其他人的消息暂时忽略)
123 => [email protected] . . . 123 => [email protected] . . . 123 => [email protected]
[email protected]
). By doing this we guarantee that the log contains a full snapshot of the final value for every key not just keys that changed recently. This means downstream consumers can restore their own state off this topic without us having to retain a complete log of all changes.Let’s start by looking at a few use cases where this is useful, then we’ll see how it can be used.
让我们看下这个功能在那些情况下有用, 然后我们在看能够怎么被使用:
- Database change subscription. It is often necessary to have a data set in multiple data systems, and often one of these systems is a database of some kind (either a RDBMS or perhaps a new-fangled key-value store). For example you might have a database, a cache, a search cluster, and a Hadoop cluster. Each change to the database will need to be reflected in the cache, the search cluster, and eventually in Hadoop. In the case that one is only handling the real-time updates you only need recent log. But if you want to be able to reload the cache or restore a failed search node you may need a complete data set.
- 数据库变更订阅. 很常有这样的情况, 一份数据会出现在多个系统里面, 而且很常其中的一个系统是数据库类型的(如RDBMS或者新的键值系统), 比如, 你有一个数据库, 一个缓存系统, 一个检索集群, 一个hadoop集群. 每次对数据库的变更需要同步到在缓存, 检索集群, 并最终保存到hadoop中. 在这种情况下, 你只需要当前实时系统最新的更新日志. 但是, 如果你要重新加载缓存, 或恢复宕机的检索节点, 你需要完整的数据
- Event sourcing. This is a style of application design which co-locates query processing with application design and uses a log of changes as the primary store for the application.
- 事件源. 这是一种应用程序设计风格,它将查询处理与应用程序设计结合在一起,并使用日志的更改作为应用程序的主存储.
- Journaling for high-availability. A process that does local computation can be made fault-tolerant by logging out changes that it makes to its local state so another process can reload these changes and carry on if it should fail. A concrete example of this is handling counts, aggregations, and other “group by”-like processing in a stream query system. Samza, a real-time stream-processing framework, uses this feature for exactly this purpose.
- 高可用日志. 一个本地计算进程, 可以把变更日志输进行, 达到容错的目的, 这样另外一个进程就能够在当前那个进程宕机的时接受继续处理. 例如, 像流数据查询例子, 如计数, 汇总或其他的分组操作. 实时系统框架如Samza, 就是为了达到这个目的使用这个特性的
The general idea is quite simple. If we had infinite log retention, and we logged each change in the above cases, then we would have captured the state of the system at each time from when it first began. Using this complete log, we could restore to any point in time by replaying the first N records in the log. This hypothetical complete log is not very practical for systems that update a single record many times as the log will grow without bound even for a stable dataset. The simple log retention mechanism which throws away old updates will bound space but the log is no longer a way to restore the current state—now restoring from the beginning of the log no longer recreates the current state as old updates may not be captured at all.
想法很简单, 我们有无限的日志, 在每种情况下记录变更日志, 我们从一开始就开始记录系统的每一次变更情况. 着用这种日志, 我们可以通过回放N个记录, 把状态恢复到任何一个时间点. 在这种假设前提下, 完全日志对于那些对同一条记录会更新很多次, 即使数据集是规定大小的, 日志也会迅速增长. 这种简单的日志维护方式除了浪费空间外, 但是这些日志也不能恢复当前的状态, 从当前的日志不能恢复当前状态, 是因为旧的日志可能没有全部的更新记录.
Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
日志压缩提供对每条记录的保存方式提供细粒度的机制, 而不是基于时间范围的粗款的方式. 我们可以在对具有相同主键的记录更新时, 选择性删除记录.这样日志可以保证拥有每个键值的最后状态
This retention policy can be set per-topic, so a single cluster can have some topics where retention is enforced by size or time and other topics where retention is enforced by compaction.
这种保存策略可以针对主题基本设置, 这样一个集群的一些主题可以按大小和时间进行保存, 一些可以按压缩策略进行保存
This functionality is inspired by one of LinkedIn’s oldest and most successful pieces of infrastructure—a database changelog caching service called Databus. Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Unlike Databus, Kafka acts as a source-of-truth store so it is useful even in situations where the upstream data source would not otherwise be replayable.
这项功能被linkedin的一个最为古老和成功的基础设施所使用, 数据库日志缓存服务叫做 Databus.
Log Compaction Basics 日志压缩基础
The head of the log is identical to a traditional Kafka log. It has dense, sequential offsets and retains all messages. Log compaction adds an option for handling the tail of the log. The picture above shows a log with a compacted tail. Note that the messages in the tail of the log retain the original offset assigned when they were first written—that never changes. Note also that all offsets remain valid positions in the log, even if the message with that offset has been compacted away; in this case this position is indistinguishable from the next highest offset that does appear in the log. For example, in the picture above the offsets 36, 37, and 38 are all equivalent positions and a read beginning at any of these offsets would return a message set beginning with 38.
日志的同步和传统的kafka日志一样, 拥有密集的, 顺序的位移, 并保存所有的消息. 日志压缩对尾部添加而外的压缩选项.这张图展示了已经压缩的尾部. 注意, 在尾部的消息保存它们第一次写入时的原始位移位置, 也要注意, 这些消息的位移位置即使在压缩过后也是合法的, 在这种情况下, 这个位置和下次出现在日志中的最高位移位置是很难区分的. 比如这图上, 36, 37和38位移位置都是同等的, 在开始读取这些位移位置时, 将会从38开始读取.
Compaction also allows for deletes. A message with a key and a null payload will be treated as a delete from the log. This delete marker will cause any prior message with that key to be removed (as would any new message with that key), but delete markers are special in that they will themselves be cleaned out of the log after a period of time to free up space. The point in time at which deletes are no longer retained is marked as the “delete retention point” in the above diagram.
压缩同时也允许删除, 如果一个带键值的消息没有任何负载数据会被认为是要从日志中删除记录, 这个删除标志会导致先前带有这个键值的消息都被删除. 但是删除标志比较特殊, 在过一段时期后会被清除后释放空间.这个执行删除的时间点, 标记为”删除保留点”
The compaction is done in the background by periodically recopying log segments. Cleaning does not block reads and can be throttled to use no more than a configurable amount of I/O throughput to avoid impacting producers and consumers. The actual process of compacting a log segment looks something like this:
日志压缩在后台定时拷贝日志段的方式进行. 清除操作可以通过配置读写I/O的限额避免对消费额和生产者产生影响. 实际的日志段压缩过程有点像如下:
What guarantees does log compaction provide? 日志压缩提供了什么保障?
Log compaction guarantees the following: 日志压缩提供了如下保障:
- Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic’s
min.compaction.lag.ms
can be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head. - 任何追上头部的消费者, 都会接受到任何写入的消息, 这些消息都有顺序的位移值. 参数
min.compaction.lag.ms
可以控制消息必须经过质指定的时间间隔后才能被压缩, 它提供了一个消息可以储存多久在头部的短暂时间范围 - Ordering of messages is always maintained. Compaction will never re-order messages, just remove some.
- 消息顺序永远被保证, 压缩不会重新排序, 只会删除一些
- The offset for a message never changes. It is the permanent identifier for a position in the log.
- 消息的位移永远不会变, 这是消息在日志中的永久性标志
- Any consumer progressing from the start of the log will see at least the final state of all records in the order they were written. All delete markers for deleted records will be seen provided the consumer reaches the head of the log in a time period less than the topic’s
delete.retention.ms
setting (the default is 24 hours). This is important as delete marker removal happens concurrently with read, and thus it is important that we do not remove any delete marker prior to the consumer seeing it. - 任何从头开始消费记录的消费者, 都会按顺序得到最终的状态. 所有的删除标志的记录会在消费者到达头部之前, 小于主题设置的
delete.retention.ms(默认是24小时)时间之内被处理.这个在删除标志发生在并行读之前很重要, 这样我们可以保证我们在消费者读取之前没有删除任何标志.
Log Compaction Details 日志压缩细节
- It chooses the log that has the highest ratio of log head to log tail
- 它选择有比较大比例的日志头去记录到日志尾
- It creates a succinct summary of the last offset for each key in the head of the log
- 它在日志头部为每个键左最后位移创建一个简洁的摘要
- It recopies the log from beginning to end removing keys which have a later occurrence in the log. New, clean segments are swapped into the log immediately so the additional disk space required is just one additional log segment (not a fully copy of the log).
- 它从头开始拷贝日志, 删除在日志最后出现的键值.新的干净日志段会被立刻交换到日志里面, 所以只需要额外的一个日志分段.
- The summary of the log head is essentially just a space-compact hash table. It uses exactly 24 bytes per entry. As a result with 8GB of cleaner buffer one cleaner iteration can clean around 366GB of log head (assuming 1k messages).
- 日志头部汇总实际上是一个空间紧凑的hash表, 使用24个字节一个条目的形式, 所以如果有8G的整理缓冲区, 则能迭代处理大约366G的日志头部(假设消息大小为1k)
Configuring The Log Cleaner 配置日志整理器
log.cleanup.policy=compact
The log cleaner can be configured to retain a minimum amount of the uncompacted “head” of the log. This is enabled by setting the compaction time lag.
日志整理起可以配置保留一段没有被压缩整理的日志头部, 这个可用通过配置日志压缩延迟时间参数:
log.cleaner.min.compaction.lag.ms
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.这个可以用于防止消息比当前正在压缩的最小消息时间更新, 如果没有设置, 所有的日志都会压缩, 除了最后一个正在被读写的段. 当前段甚至在消息大于最小压缩延迟时间也不会被压缩.
Further cleaner configurations are described here.
更多的整理器的配置可以参考这里
4.9 Quotas 配额
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per group of clients sharing a quota.
从0.9版本开始, kafka集群对生产和消费请求进行限额配置, 配置主要是根据客户端分组按字节速率进行限定的.
Why are quotas necessary? 配额有必要么?
It is possible for producers and consumers to produce/consume very high volumes of data and thus monopolize broker resources, cause network saturation and generally DOS other clients and the brokers themselves. Having quotas protects against these issues and is all the more important in large multi-tenant clusters where a small set of badly behaved clients can degrade user experience for the well behaved ones. In fact, when running Kafka as a service this even makes it possible to enforce API limits according to an agreed upon contract.
生产者/消费者可能在生产/消费大量的数据, 因此会对服务器资源的大量独占, 导致网络达到饱和, 对其它客户端造成影响. 如果项目配置了限额就可以降低这些问题, 特别是在多租户的集群中, 一小部分低质量的客户端用户会降低这个用户集群的体验, 当使用kafka作为一项服务时, 甚至可以通过上层的协议来使用api进行强制限制
Client groups 客户端分组
PrincipalBuilder
. Client-id is a logical grouping of clients with a meaningful name chosen by the client application. The tuple (user, client-id) defines a secure logical group of clients that share both user principal and client-id.Quotas can be applied to (user, client-id), user or client-id groups. For a given connection, the most specific quota matching the connection is applied. All connections of a quota group share the quota configured for the group. For example, if (user=”test-user”, client-id=”test-client”) has a produce quota of 10MB/sec, this is shared across all producer instances of user “test-user” with the client-id “test-client”.
配额可以用于(user, client-id)组合, 或user, client-id分组. 对一个给定的连接, 最符合这个连接的配额被使用到, 一个限额组的所有连接共享这个限额配置, 例如: 如果(user=”test-user”, client-id=”test-client”) 10MB/s的配额, 这个配置会被所有的具有”test-user”用户 和客户端ID是 “test-client”的所有生产者所共享.
Quota Configuration 限额配置
Quota configuration may be defined for (user, client-id), user and client-id groups. It is possible to override the default quota at any of the quota levels that needs a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. User and (user, client-id) quota overrides are written to ZooKeeper under /config/users and client-id quota overrides are written under /config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details. Default quotas for each group may also be updated dynamically using the same mechanism.
配额可以按照(user, client-id)或者, user或client-id进行分组, 如果需要更高或更低的配额, 可以覆盖默配额, 这个机制类似于对日志主题配置的覆盖, user 或者 (user, client-id)配额可以覆盖写入到zookeeper下的 /config/users ,client-id配置, 可以写入到 /config/clients. 这些覆盖写入会被服务器很快的读取到, 这让我们修改配置不需要重新启动服务器. 每个分组的默认配置也可以同样的方式动态修改.
The order of precedence for quota configuration is:
限额的配置顺序如下:
- /config/users/<user>/clients/<client-id>
- /config/users/<user>/clients/<default>
- /config/users/<user>
- /config/users/<default>/clients/<client-id>
- /config/users/<default>/clients/<default>
- /config/users/<default>
- /config/clients/<client-id>
- /config/clients/<default>
Broker properties (quota.producer.default, quota.consumer.default) can also be used to set defaults for client-id groups. These properties are being deprecated and will be removed in a later release. Default quotas for client-id can be set in Zookeeper similar to the other quota overrides and defaults.服务器属性(quota.producer.default, quota.consumer.default)也可以用来配置默认client-id分组的默认值, 这些属性配置已经不鼓励使用, 会在后期删除掉. 默认client-id限额配置可以和其它默认配置一样, 在Zookeeper直接设置.
Enforcement 实施
By default, each unique client group receives a fixed quota in bytes/sec as configured by the cluster. This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
默认情况下, 每个唯一的客户端分组在集群上配置一个固定的限额, 这个限额是基于每台服务器的, 每个客户端能发布或获取在每台服务器都的最大速率, 我们按服务器定义配置, 而不是按整个集群定义, 是因为如果是集群范围的需要额外的机制来共享配额的使用情况, 这会导致配额机制的实现比较难.
How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under its quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
那么如果服务器检测到超出配额要怎么办? 在我们的解决方案中, 服务器不是返回错误, 而是尝试使客户端减低它的速率. 需要先计算有问题的客户端在它配额下需要的延迟时间, 然后延迟这段时间后响应.这使配置限制对客户端透明, 也可以防止它们做一些补偿策略或尝试行为, 会导致行为更为奇怪. 非法的客户端信息(没有补偿的重试行为)会导致比使用配置要解决的问题更恶劣.
Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly. Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience
客户端的字节限速使用多个小时间窗口(每秒30个窗口), 来快速检测和更正配额越界. 如果使用太大的配额窗口(例如30秒10个窗口), 容易导致在较长时间内有巨大的流量突增, 这个在实际中用户体验上不好.