关于简单介绍Mapreduce,Hbase,Kafka,Zookeeper

1.1. zookeeper是干什么的?

Zookeeper 是 分布式协调服务,

分布式应用程序可以基于它实现同步服务,配置维护和命名服务等

1.2. zookeeper节点类型

Znode有两种类型:

短暂(ephemeral)(断开连接自己删除)

持久(persistent)(断开连接不删除)

Znode有四种形式的目录节点(默认是persistent 

PERSISTENT

PERSISTENT_SEQUENTIAL(持久序列/test0000000019 

EPHEMERAL

EPHEMERAL_SEQUENTIAL

创建znode时设置顺序标识,znode名称后会附加一个值

顺序号是一个单调递增的计数器,由父节点维护

在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

Zookpeeper的基本架构

关于简单介绍Mapreduce,Hbase,Kafka,Zookeeper
1 每个Server在内存中存储了一份数据;
2 Zookeeper启动时,将从实例中选举一个leader(Paxos协议);
3 Leader负责处理数据更新等操作(Zab协议);
4 一个更新操作成功,当且仅当大多数Server在内存中成功修改
数据。
关于简单介绍Mapreduce,Hbase,Kafka,Zookeeper


1.3. zookeeper选举机制

1.3.1. zookeeper的选举机制(全新集群)

以一个简单的例子来说明整个选举的过程.
假设有五台服务器组成的zookeeper集群,它们的id1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的.假设这些服务器依序启动,来看看会发生什么.
1) 服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态
2) 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.
3) 服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.
4) 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.
5) 服务器5启动,4一样,当小弟.

1.3.2. 非全新集群的选举机制(数据恢复)

那么,初始化的时候,是按照上述的说明进行选举的,但是当zookeeper运行了一段时间之后,有机器down掉,重新选举时,选举过程就相对复杂了。

需要加入数据idleader id和逻辑时钟。

数据id:数据新的id就大,数据每次更新都会更新id

Leader id:就是我们配置的myid中的值,每个机器一个。

逻辑时钟:这个值从0开始递增,每次选举对应一个值,也就是说:  如果在同一次选举中,那么这个值应该是一致的 ;  逻辑时钟值越大,说明这一次选举leader的进程更新.

选举的标准就变成:

1、逻辑时钟小的选举结果被忽略,重新投票

2、统一逻辑时钟后,数据id大的胜出

3、数据id相同的情况下,leader id大的胜出

根据这个规则选出leader

1.4. 说说共享锁

共享锁在同一个进程中很容易实现,

但是在跨进程或者在不同 Server 之间就不好实现了。

Zookeeper 却很容易实现这个功能,

实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL(短暂序列化目录节点,

然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点 是不是 就是自己创建的目录节点?

如果正是自己创建的,那么它就获得了这个锁,

如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,

一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,

释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

应用于选主


请介绍一下MapReduce的工作原理。

【解】MapReduce是一个分布式计算框架,用于大规模数据集的并行运算。简单地说,MapReduce就是”任务的分解与结果的汇总”:将一个大的数据处理任务划分成许多个子任务,并将这些子任务分配给各个节点并行处理,然后通过整合各个节点的中间结果,得到最终结果。

MapReduce是主从架构,在master上跑的是JobTracker/ResourceManager,负责资源分配与任务调度;而各个slave上跑的是TaskTracker/NodeManager,负责执行任务,并定期向master汇报最新状态与执行进度。

对于一个MR任务,它的输入、输出以及中间结果都是<key, value>键值对:

  • Map:<k1, v1> ——> list(<k2, v2>)
  • Reduce:<k2, list(v2)> ——> list(<k3, v3>)

MR程序的执行过程主要分为三步:Map阶段、Shuffle阶段、Reduce阶段,如下图:

关于简单介绍Mapreduce,Hbase,Kafka,Zookeeper

  1. Map阶段

    • 分片(Split):map阶段的输入通常是HDFS上文件,在运行Mapper前,FileInputFormat会将输入文件分割成多个split ——1个split至少包含1个HDFS的Block(默认为64M);然后每一个分片运行一个map进行处理。

    • 执行(Map):对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对。

      • Partitioner:对 map 函数的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。默认是对key哈希后再以reduce task数量取模,默认的取模方式只是为了避免数据倾斜。然后该key/value对以及partitionIdx的结果都会被写入环形缓冲区。
    • 溢写(Spill):map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,将缓冲的数据写出到磁盘。

      • Sort:在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx, key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce)
      • Combiner:如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。
    • 合并(Merge):溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行 sort & combine 操作,最后合并成了一个已分区且已排序的文件。

  2. Shuffle阶段:广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程

    • Copy过程:Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。

    • Merge过程:Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时,将数据溢写到磁盘(与map端类似,溢写过程会执行 sort & combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。这个过程也会不停地执行 sort & combine 操作。

  3. Reduce阶段:Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个键值对调用reduce()方法,并将结果写到HDFS。

你能简单描述一下HBase吗?能画出它的架构图吗?

HBase是一个面向列的 NoSQL 分布式数据库,它利用HDFS作为底层存储系统。那么,HBase相对于传统的关系型数据库有什么不同呢?

  • HBase是schema-free的,它的列是可以动态增加的(仅仅定义列族),并且为空的列不占物理存储空间。
  • HBase是基于列存储的,每个列族都由几个文件保存,不同的列族的文件是分离的。
  • HBase自动切分数据,使得数据存储自动具有很好的横向扩展性。
  • HBase没有任何事务,提供了高并发读写操作的支持。

HBase中的Table是一个稀疏的、多维度的、排序的映射表,这张表的索引是[RowKey, ColumnFamily, ColumnQualifier, Timestamp],其中Timestamp表示版本,默认获取最新版本。HBase是通过RowKey来检索数据的,RowKey是Table设计的核心,它按照ASCII有序排序,因此应尽量避免顺序写入。RowKey设计应该注意三点:

  • 唯一原则:在HBase中rowkey可以看成是表的主键,必须保证其唯一性。
  • 散列原则:由于rowkey是按字典有序的,故应避免rowkey连续有序而导致在某一台RegionServer上堆积的现象。例如可以拼接随机数、将时间戳倒序等。
  • 长度原则:设计时RowKey要尽量短,这样可以提高有效数据的比例,节省存储空间,也可以提高查询的性能。

下面是HBase的整体架构图:

关于简单介绍Mapreduce,Hbase,Kafka,Zookeeper


2、你说了解kafka,能简单描述一下Kafka吗?能画出它的架构图吗?

Kafka是一个高吞吐、易扩展的分布式发布-订阅消息系统,它能够将消息持久化到磁盘,用于批量的消费。Kafka中有以下几个概念:

  • Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
  • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Broker:Kafa集群中包含一台或多台服务器,这种服务器被称为broker。
  • Producer:生产者,向Kafka的一个topic发布消息。
  • Consumers:消费者,从kafka的某个topic读取消息。

关于简单介绍Mapreduce,Hbase,Kafka,Zookeeper

Kafka一些重要设计思想

  • Consumergroup:各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。
  • 消息状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
  • 消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
  • 消息有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
  • 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
  • push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
  • Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
  • 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
  • 同步异步:Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
  • 分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
  • 离线数据装载:Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。

消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition Logs(分区日志)组成

我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。

Kafka集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。

kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。如果你的副本数量设置为3,那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。

Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。Producer在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader作为读写用。

4.2消息可靠性

在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况:

  • 一个消息发送失败
  • 一个消息被发送多次
  • 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次

有许多系统声称它们实现了exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息,但是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理取过来的消息时失败了。
从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。
从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。

4.3 备份机制

备份机制是Kafka0.8版本的新特性,备份机制的出现大大提高了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步