大数据技术之高频面试题

第一章

  1. 项目涉及技术

1.1 Linux&Shell

1.1.1 Linux常用高级命令

序号

命令

命令解释

1

top

查看内存

2

df -h

查看磁盘存储情况

3

iotop

查看磁盘IO读写(yum install iotop安装)

4

iotop -o

直接查看比较高的磁盘读写程序

5

netstat -tunlp | grep 端口号

查看端口占用情况

6

uptime

查看报告系统运行时长及平均负载

7

ps -aux

查看进程

1.1.2 Shell常用工具及写过的脚本

1awk、sed、cut、sort

2)用Shell写过哪些脚本

(1)集群启动,分发脚本

(2)数仓与mysql的导入导出

(3)数仓层级内部的导入

1.1.3 Shell中提交了一个脚本,进程号已经不知道了,但是需要kill掉这个进程,怎么操作?

ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"

1.1.4 Shell中单引号和双引号区别

1)在/home/hadoop/bin创建一个test.sh文件

[[email protected] bin]$ vim test.sh

文件中添加如下内容

#!/bin/bash

do_date=$1

 

echo '$do_date'

echo "$do_date"

echo "'$do_date'"

echo '"$do_date"'

echo `date`

2)查看执行结果

[[email protected] bin]$ test.sh 2019-02-10

$do_date

2019-02-10

'2019-02-10'

"$do_date"

2019年 05月 02日 星期四 21:02:08 CST

3)总结:

(1)单引号取变量值

(2)双引号取变量值

(3)反引号`,执行引号中命令

(4)双引号内部嵌套单引号,取出变量值

(5)单引号内部嵌套引号,不取出变量值

 

 

1.2 Hadoop

1.2.1 Hadoop常用端口号

 

hadoop2.x

Hadoop3.x

访问HDFS端口

50070   

9870

访问MR执行情况端口

8088    

8088    

历史服务器

19888  

19888  

客户端访问集群端口

9000

8020

1.2.2 Hadoop配置文件以及简单的Hadoop集群搭建

(1)配置文件:

Hadoop2.x core-site.xmlhdfs-site.xmlmapred-site.xmlyarn-site.xml   slaves

Hadoop3.x core-site.xmlhdfs-site.xmlmapred-site.xmlyarn-site.xml   workers

(2)简单的集群搭建过程:

JDK安装

配置SSH免密登录

配置hadoop核心文件: 

格式化namenode

1.2.3 HDFS读流程写流程

大数据技术之高频面试题

大数据技术之高频面试题

 

 

1.2.4 HDFS小文件处理

1)会有什么影响

(1)1个文件块,占用namenode多大内存150字节

1亿个小文件*150字节

1 个文件块 * 150字节

128G能存储多少文件块?   128 * 1024*1024*1024byte/150字节 = 9亿文件块

2)怎么解决

(1)采用har归档方式,将小文件归档

(2)采用CombineTextInputFormat

(3)有小文件场景开启JVM重用;如果没有小文件,不要开启JVM重用,因为会一直占用使用到的task卡槽,直到任务完成才释放

JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间

<property>

    <name>mapreduce.job.jvm.numtasks</name>

    <value>10</value>

    <description>How many tasks to run per jvm,if set to -1 ,there is  no limit</description>

</property>   

1.2.5 Shuffle及优化

1、Shuffle过程

大数据技术之高频面试题

大数据技术之高频面试题

 

 

2、优化

1Map阶段

(1)增大环形缓冲大小。由100m扩大到200m

(2)增大环形缓冲区溢写的比例。由80%扩大到90%

(3)减少对溢写文件的merge次数(10个文件,一次20个merge

4)不影响实际业务的前提下,采用Combiner提前合并,减少 I/O

2Reduce阶段

1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。

2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

3规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)增加每个Reduce去Map中拿数据的并行数

5)集群性能可以的前提下,增大Reduce端存储数据内存的大小。

3IO传输

采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器

压缩:

(1)map输入端主要考虑数据量大小和切片,支持切片的有Bzip2LZO。注意:LZO要想支持切片必须创建索引;

(2)map输出端主要考虑速度,速度快的snappy、LZO;

(3)reduce输出端主要看具体需求,例如作为下一个mr输入需要考虑切片,永久保存考虑压缩率比较大的gzip。

4)整体

(1)NodeManager默认内存8G,需要根据服务器实际配置灵活调整,例如128G内存,配置为100G内存左右,yarn.nodemanager.resource.memory-mb

(2)单任务默认内存8G,需要根据该任务的数据量灵活调整,例如128m数据,配置1G内存,yarn.scheduler.maximum-allocation-mb

(3)mapreduce.map.memory.mb :控制分配给MapTask内存上限,如果超过会kill掉进程(报:Container is running beyond physical memory limits. Current usage:565MB of512MB physical memory used;Killing Container)。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m可以增加MapTask内存,最大可以增加到4-5g

4mapreduce.reduce.memory.mb:控制分配给ReduceTask内存上限。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m可以增加ReduceTask内存大小4-5g

(5)mapreduce.map.java.opts:控制MapTask堆内存大小。(如果内存不够,报:java.lang.OutOfMemoryError

(6)mapreduce.reduce.java.opts:控制ReduceTask堆内存大小。(如果内存不够,报:java.lang.OutOfMemoryError

7)可以增加MapTaskCPU核数增加ReduceTaskCPU核数

8)增加每个ContainerCPU核数和内存大小

9)在hdfs-site.xml文件中配置多目录

10)NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为10台时,此参数设置为60

1.2.6 Yarn工作机制

大数据技术之高频面试题

1.2.7 Yarn调度器

1Hadoop调度器重要分为三类:

FIFO 、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。

Apache默认的资源调度器是容量调度器

CDH默认的资源调度器是公平调度器。

2)区别:

FIFO调度器:支持单队列 、先进先出   生产环境不会用。

容量调度器:支持多队列,保证先进入的任务优先执行。

公平调度器:支持多队列,保证每个任务公平享有队列资源

3)在生产环境下怎么选择?

     大厂:如果对并发度要求比较高,选择公平,要求服务器性能必须OK;

     中小公司,集群服务器资源不太充裕选择容量。

4)在生产环境怎么创建队列?

(1)调度器默认就1个default队列,不能满足生产要求。

    (2)按照框架:hive /spark/ flink 每个框架的任务放入指定的队列(企业用的不是特别多)

(3)按照业务模块:登录注册、购物车、下单、业务部门1、业务部门2

5)创建多队列的好处?

(1)因为担心员工不小心,写递归死循环代码,把所有资源全部耗尽。

(2)实现任务的降级使用,特殊时期保证重要的任务队列资源充足。

业务部门1(重要)=》业务部门2(比较重要)=》下单(一般)=》购物车(一般)=》登录注册(次要)

1.2.8 项目经验基准测试

搭建完Hadoop集群后需要对HDFS读写性能和MR计算能力测试。测试jar包在hadoop的share文件夹下。

1.2.9 Hadoop宕机

1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb单个任务可申请的最多物理内存量,默认是8192MB

2)如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize。。

1.2.10 Hadoop解决数据倾斜方法

1)提前在map进行combine,减少传输的数据量

在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。

如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。

2)导致数据倾斜的key 大量分布在不同的mapper

(1)局部聚合加全局聚合。

第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。

第二次mapreduce,去掉key的随机前缀,进行全局聚合。

思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。

这个方法进行两次mapreduce,性能稍差。

(2)增加Reducer,提升并行度
JobConf.setNumReduceTasks(int)

(3)实现自定义分区

根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

1.2.11 集群资源分配参数(项目中遇到的问题

集群有30台机器,跑mr任务的时候发现5个map任务全都分配到了同一台机器上,这个可能是由于什么原因导致的吗?

解决方案:yarn.scheduler.fair.assignmultiple 这个参数 默认是开的,需要关掉

https://blog.csdn.net/leone911/article/details/51605172

 

 

1.3 Zookeeper

1.3.1 选举机制

半数机制:2n+1,安装奇数台

10服务器:3台

20台服务器:5台

100台服务器:11台

台数多,好处:提高可靠性;坏处:影响通信延时

1.3.2 常用命令

lsgetcreate

1.3.3 Paxos算法(扩展)

注意:暂时先不用看。如果后期准备面今日头条,需要认真准备,其他公司几乎都不问。

Paxos算法一种基于消息传递且具有高度容错特性的一致性算法。

分布式系统中的节点通信存在两种模型:共享内存(Shared memory)和消息传递(Messages passing)。基于消息传递通信模型的分布式系统,不可避免的会发生以下错误:进程可能会慢、被杀死或者重启,消息可能会延迟、丢失、重复,在基础Paxos场景中,先不考虑可能出现消息篡改即拜占庭错误的情况。Paxos算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。

1.3.4 讲一讲什么是CAP法则?Zookeeper符合了这个法则的哪两个?(扩展)

CAP法则:强一致性、高可用性、分区容错性;

Zookeeper符合强一致性、高可用性!

 

1.4 Flume

1.4.1 Flume组成,Put事务,Take事务

1)taildir source

(1)断点续传、多目录

    (2)哪个flume版本产生的?Apache1.7、CDH1.6

(3)没有断点续传功能时怎么做的? 自定义

(4)taildir挂了怎么办?

     不会丢数:断点续传

     重复数据:

(5)怎么处理重复数据?

     不处理:生产环境通常不处理,因为会影响传输效率

     处理

         自身:在taildirsource里面增加自定义事务

         找兄弟:下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis)

(6)taildir source 是否支持递归遍历文件夹读取文件?

     不支持。  自定义  递归遍历文件夹 +读取文件

2)file channel /memory channel/kafka channel  

(1)file channel

          数据存储于磁盘,优势:可靠性高;劣势:传输速度低

          默认容量:100万event

注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

(2)memory channel

          数据存储于内存,优势:传输速度快;劣势:可靠性差

          默认容量:100个event

(3)kafka channel

          数据存储于Kafka,基于磁盘;

          优势:可靠性高;

          传输速度快 kafka channel》memory channel+kafka sink  原因省去了sink阶段

(4)kafka channel哪个版本产生的?

          flume1.6 版本产生=》并没有火;因为有bug

          topic-start 数据内容

          topic-event 数据内容    ture  和false 很遗憾,都不起作用。

          增加了额外清洗的工作量。

          flume1.7解决了这个问题,开始火了。

(5)生产环境如何选择

          如果下一级是kafka,优先选择kafka channel

          如果是金融、对钱要求准确的公司,选择file channel

          如果就是普通的日志,通常可以选择memory channel

          每天丢几百万数据   pb级   亿万富翁,掉1块钱会捡?

3)HDFS sink    

     (1)时间(1小时-2小时) or 大小128m、event个数(0禁止)

具体参数:hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0

4)事务

SourceChannel是Put事务

Channel到Sink是Take事务

1.4.2 Flume拦截器

1拦截器注意事项

项目自定义了:ETL拦截器。

采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点性能会低一些

2)自定义拦截器步骤

(1)实现 Interceptor

(2)重写四个方法

  • initialize 初始化
  • public Event intercept(Event event) 处理单个Event
  • public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
  • close 方法

(3)静态内部类,实现Interceptor.Builder

3)拦截器可以不用吗?

     可以不用;需要在下一级hive的dwd层和sparksteaming里面处理

     优势:只处理一次,轻度处理;劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。

1.4.3 Flume Channel选择器

大数据技术之高频面试题 

 

1.4.4 Flume监控器

1)采用Ganglia监控器,监控到flume尝试提交的次数远远大于最终成功的次数,说明flume运行比较差。

2)解决办法?

(1)自身:增加内存flume-env.sh   4-6g

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc

(2)找朋友:增加服务器台数

搞活动 618  =》增加服务器=》用完在退出

日志服务器配置:8-16g内存、磁盘8T

1.4.5 Flume采集数据会丢失吗?(防止数据丢失的机制)

如果是FileChannel不会,Channel存储可以存储File中,数据传输身有事务

如果是MemoryChannel有可能丢。

1.5 Kafka

https://www.jianshu.com/p/2b4ca1fae5d8

1.5.1 Kafka架构

生产者、Broker、消费者、ZK

注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息。

大数据技术之高频面试题

大数据技术之高频面试题

 

 

1.5.2 Kafka的机器数量

Kafka机器数量=2*(峰值生产速度*副本数/100)+ 1

1.5.3 副本设定

一般我们设置成2个或3个,很多企业设置为2个

副本的优势:提高可靠性;副本劣势:增加了网络IO传输

1.5.4 Kafka压测

Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈

1.5.5 Kafka日志保存时间

默认保存7天;生产环境建议3天

1.5.6 Kafka中数据量计算

每天总数据量100g,每天产生1亿日志, 10000万/24/60/60=1150条/秒钟

平均每秒钟:1150

低谷钟:50

高峰每秒钟:1150*(2-20=2300条-23000条

每条日志大小0.5k-2k(取1k

每秒多少数据量:2.0M-20MB

1.5.7 Kafka的硬盘大小

每天的数据量100g*2个副本*3天/70%

1.5.8 Kafka监控

公司自己开发的监控器;

开源的监控器:KafkaManager、KafkaMonitorKafkaEagle

1.5.9 Kakfa分区数

1)创建一个只有1个分区的topic

2)测试这个topic的producer吞吐量和consumer吞吐量。 

3)假设他们的值分别是Tp和Tc,单位可以是MB/s。

4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)

例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s

分区数=100 / 20 =5分区

https://blog.csdn.net/weixin_42641909/article/details/89294698

分区数一般设置为:3-10

1.5.10 多少个Topic

通常情况:多少个日志类型就多少个Topic也有日志类型进行合并的。

1.5.11 Kafka的ISR副本同步队列

ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。

任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。

1.5.12 Kafka分区分配策略

在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。

Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

例如我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。

C1-0 将消费 0, 1, 2, 3 分区

C2-0 将消费 4, 5, 6 分区

C2-1 将消费 7, 8, 9 分区

第一步:将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。

1.5.13 Kafka挂掉

1)Flume记录

2)日志有记录

3)短期没事

1.5.14 Kafka丢不丢数据

Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。

Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。

Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。

1.5.15 Kafka数据重复

幂等性+ack-1+事务

Kafka数据重复,可以再下一级:SparkStreamingredis或者hive中dwd层去重去重的手段:分组、按照id开窗只取第一个值;

1.5.16 Kafka消息数据积压,Kafka消费能力不足怎么处理? 

1如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

1.5.17 Kafka参数优化

1)Broker参数配置(server.properties

1、日志保留策略配置

# 保留三天,也可以更短 (log.cleaner.delete.retention.ms)

log.retention.hours=72

 

2、Replica相关配置

default.replication.factor:1 默认副本1

 

3、网络通信延时

replica.socket.timeout.ms:30000 #当集群之间网络不稳定时,调大该参数

replica.lag.time.max.ms= 600000# 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数

2)Producer优化(producer.properties

compression.type:none

#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。

GZIP、Snappy和LZ4。从2.1.0开始,kafka正式支持Zstandard算法(简写zstd)。它是Facebook开源的一个压缩算法,能够提供超高的压缩比。对于kafka测试而言,在吞吐方面:LZ4>Snappy> zstd、GZIP;在压缩比方面:zstd>lz4>gzip>snappy。具体到物理资源,使用snappy算法占用的网络带宽资源最多,zstd最少,这是合理的,毕竟zstd就是要提供超高的压缩比;在CPU使用率方面,各个算法表现得差不多,只是在压缩时snappy使用的CPU较多一些,而在解压缩时gzip算法则可能使用更多的CPU。

3Kafka内存调整(kafka-server-start.sh

默认内存1个G生产环境尽量不要超过6个G。

export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

1.5.18 Kafka高效读写数据

1Kafka本身是分布式集群,同时采用分区技术,并发度高。

2)顺序写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。

3)零复制技术

大数据技术之高频面试题

大数据技术之高频面试题

 

1.5.19 Kafka单条日志传输大小

kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要对kafka进行以下配置:server.properties

replica.fetch.max.bytes: 1048576  broker可复制的消息的最大字节数, 默认为1M

message.max.bytes: 1000012   kafka 会接收单个消息size的最大限制, 默认为1M左右

注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。

1.5.20 Kafka过期数据清理

保证数据没有被引用(没人消费他)

日志清理保存的策略只有delete和compact两种

log.cleanup.policy=delete启用删除策略

log.cleanup.policy=compact启用压缩策略

https://www.jianshu.com/p/fa6adeae8eb5

1.5.21 Kafka可以按照时间消费数据

Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp);

1.5.22 Kafka消费者角度考虑是拉取数据还是推送数据

拉取数据

1.5.23 Kafka中的数据是有序的吗

单分区内有序;多分区,分区与分区间无序;

 

1.6 Hive

补充

1.6.1 Hive的架构

大数据技术之高频面试题

 

1.用户接口:Client

CLI(command-line interface)、JDBC/ODBC(jdbc访问hive)、WEBUI(浏览器访问hive)

2.元数据:Metastore

元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;

默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore

3.Hadoop

使用HDFS进行存储,使用MapReduce进行计算。

4.驱动器:Driver

(1)解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。

(2)编译器(Physical Plan):将AST编译生成逻辑执行计划。

(3)优化器(Query Optimizer):对逻辑执行计划进行优化。

(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR。

大数据技术之高频面试题

 

Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。

 

1.6.2 Hive和数据库比较

Hive 和数据库除了拥有类似的查询语言,再无类似之处。

1数据存储位置

Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。

2数据更新

Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,

3执行延迟

Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。

4数据规模

Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。

1.6.3 内部表和外部表

元数据、原始数据

1)删除数据时:

内部表:元数据、原始数据,全删除

外部表:元数据 只删除

2)在公司生产环境下,什么时候创建内部表,什么时候创建外部表?

在公司中绝大多数场景都是外部表。

自己使用的临时表,才会创建内部表;

1.6.4 4个By区别

1Order By:全局排序,只有一个Reducer;

2Sort By:分区内有序;

3Distrbute By:类似MR中Partition,进行分区,结合sort by使用。

4 Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。

在生产环境中Order By用的比较少,容易导致OOM。

在生产环境中Sort By+ Distrbute By用的多。

1.6.5 系统函数记20个

1)date_add、date_sub函数(加减日期

2next_day函数(周指标相关)

3)date_format函数(根据格式整理日期

4last_day函数(求当月最后一天日期

5)collect_set函数

6)get_json_object解析json函数

7)NVL(表达式1,表达式2)

如果表达式1为空值,NVL返回值为表达式2的值,否则返回表达式1的值。

1.6.6 自定义UDF、UDTF函数

1)在项目中是否自定义过UDF、UDTF函数以及用他们处理了什么问题及自定义步骤?

(1)用UDF函数解析公共字段;用UDTF函数解析事件字段。

(2)自定义UDF:继承UDF重写evaluate方法

(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close

2)为什么要自定义UDF/UDTF

因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。

1.6.7 窗口函数

1)Rank

(1)RANK() 排序相同时会重复,总数不会变

(2)DENSE_RANK() 排序相同时会重复,总数会减少

(3)ROW_NUMBER() 会根据顺序计算

2) OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化

(1)CURRENT ROW:当前行

(2)n PRECEDING:往前n行数据

(3) n FOLLOWING:往后n行数据

(4)UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点

(5) LAG(col,n):往前第n行数据

(6)LEAD(col,n):往后第n行数据

(7) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。

3)手写TopN

https://www.cnblogs.com/liuwchao/articles/11535282.html

 

1.6.8 Hive优化

https://www.cnblogs.com/swordfall/p/11037539.html

1MapJoin

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。默认是打开的,不要关闭。

2)行列过滤

列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。

行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤 

3)列式存储

4)采用分区技术  

5合理设置Map数

mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B

mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB

通过调整max可以起到调整map数的作用,减小max可以增加map数,增大max可以减少map数。 max(0,min(块大小,Long的最大值))  

需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。

如果设置:

org.apache.hadoop.hive.ql.io.HiveInputFormat  上述参数是有效果的

6合理设置Reduce数

Reduce个数并不是越多越好

(1)过多的启动和初始化Reduce也会消耗时间和资源;

(2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;

7小文件如何产生的?

(1)动态分区插入数据,产生大量的小文件,从而导致map数量剧增;

(2)reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);

(3)数据源本身就包含大量的小文件。

8小文件解决方案

https://blog.csdn.net/shudaqi2010/article/details/90342417

(1)在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。

set hive.input.format=org.apache.hadoop.hive.al.io.CombineHiveInputFormat

(2)merge

// 输出合并小文件

SET hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件

SET hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件

SET hive.merge.size.per.task = 268435456; -- 默认256M

SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于16m该值时,启动一个独立的map-reduce任务进行文件merge

  1. 开启JVM重用

JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。默认值是1

 

<property>

  <name>mapreduce.job.jvm.numtasks</name>

 

  <value>10</value>

  <description>How many tasks to run per jvm. If set to -1, there is no limit. </description>

 

</property>

 

这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

set mapreduce.job.jvm.numtasks=10

9开启map端combiner(不影响最终业务逻辑)

set hive.map.aggr=true;

  1. 压缩(选择快的)

关于压缩:

https://blog.csdn.net/qq262593421/article/details/101685223

设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率)

set hive.exec.compress.intermediate=true --启用中间数据压缩

set mapreduce.map.output.compress=true --启用最终数据压缩

set mapreduce.map.outout.compress.codec=…; --设置压缩方式  snappy

  1. 采用tez引擎或者spark引擎

tez 和 spark都是基于内存,速度比mr都要快,mr更稳定。

  1. 数据量特别大,不在乎时间,mr更合理,清洗很多mr
  2. 数据量不大,需要很快出结果,简单计算,用tez,临时的需求
  3. 数据比较大,对时效有一定要求,报表,spark

1.6.9 Hive解决数据倾斜方法

https://blog.csdn.net/ccorg/article/details/60147863

1)数据倾斜长啥样?

大数据技术之高频面试题

 

2)怎么产生的数据倾斜?

1)不同数据类型关联产生数据倾斜  

情形:比如用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时。

后果:处理此特殊值的reduce耗时;只有一个reduce任务。默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方式:把数字类型转换成字符串类型

select * from users a

left outer join logs b

on a.usr_id = cast(b.user_id as string)

2)控制空值分布

在生产环境经常会用大量空值数据进入到一个reduce中去,导致数据倾斜。

解决办法:

自定义分区,将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个Reducer。

注意:对于异常值如果不需要的话,最好是提前在where条件里过滤掉,这样可以使计算量大大减少

 最常见的数据倾斜是场景是key分布不均,个别的key特别多,其他key比较少

3)解决数据倾斜的方法?

(1)group by

如果需要做过滤distinct,假设可以用group by能够实现一样的效果,用group by

注:group by 优于distinct group

解决方式:采用sum() group by的方式来替换count(distinct )完成计算。

(2)mapjoin没有shuffle,没有倾斜。

(3)开启数据倾斜时负载均衡

set hive.groupby.skewindata=true;

思想就是先随机分发并处理,再按照key group by来分发处理。

操作当选项设定为true,生成的查询计划会有两个MRJob。

第一个MRJob中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;

第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的原始GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

点评它使计算变成了两个mapreduce,先在第一个中在shuffle过程partition时随机给 key打标记,使每个key随机均匀分布到各个reduce上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上。

所以需要第二次的mapreduce这次就回归正常shuffle但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次mr中随机分布到各个节点完成。

1.6.10 Hive里边字段的分隔符用的什么?为什么用\t?有遇到过字段里边有\t的情况吗,怎么处理的? 

hive 默认的字段分隔符为ascii码的控制符\001(^A),建表的时候用fields terminated by '\001'。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和javaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束。一旦传输过来的数据含有分隔符,需要在前一级数据中转义或者替换(ETL)。

1.6.11 Tez引擎优点

Tez可以将多个有依赖的作业转换为一个作业这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能

Mr/tez/spark区别:

Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘  DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。

Tez引擎:完全基于内存。  注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。

1.6.12 MySQL元数据备份

1)MySQL之元数据备份(项目中遇到的问题

元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)

 Keepalived或者mycat

2)MySQL utf8超过字节数问题

MySQL的utf8编码最多存储3个字节,当数据中存在表情号、特色符号时会占用超过3个字节数的字节,那么会出现错误 Incorrect string value: '\xF0\x9F\x91\x91\xE5\xB0...'

解决办法:将utf8修改为utf8mb4

首先修改库的基字符集和数据库排序规则

大数据技术之高频面试题

 

再使用 SHOW VARIABLES LIKE '%char%'; 命令查看参数

大数据技术之高频面试题

 

确保这几个参数的value值为utf8mb4 如果不是使用set命令修改

如:set character_set_server = utf8mb4;

1.6.13 UnionUnion all区别

1)union会将联合的结果集去重,效率较union all

2)union all不会对结果集去重,所以效率高

1.6.14 hive的sql是如何转化为mapreduce?

了解了MapReduce实现SQL基本操作之后,我们来看看Hive是如何将SQL转化为MapReduce任务的,整个编译过程分为六个阶段:

  1. Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree (做语法解析,语法检查,转化为ast tree)
  1. 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
  2. 遍历QueryBlock,翻译为执行操作树OperatorTree  
  3. 逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量把逻辑计划进行优化,得到优化后的逻辑计划
  4. 遍历OperatorTree,翻译为MapReduce任务得到物理计划

物理层优化器进行MapReduce任务的变换,生成最终的执行计划

 

 

1.7 Sqoop

1.7.1 Sqoop参数

/opt/module/sqoop/bin/sqoop import \

--connect \

--username \

--password \

--target-dir \

--delete-target-dir \

--num-mappers \

--fields-terminated-by   \

--query   "$2" ' and $CONDITIONS;'

1.7.2 Sqoop导入导出Null存储一致性问题

Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。

1.7.3 Sqoop数据导出一致性问题

场景1:如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。

官网:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

Since Sqoop breaks down export process into multiple transactions, it is possible that a failed export job may result in partial data being committed to the database. This can further lead to subsequent jobs failing due to insert collisions in some cases, or lead to duplicated data in others. You can overcome this problem by specifying a staging table via the --staging-table option which acts as an auxiliary table that is used to stage exported data. The staged data is finally moved to the destination table in a single transaction.

–staging-table方式

sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string '\N'

1.7.4 Sqoop底层运行的任务是什么

只有Map阶段,没有Reduce阶段任务。默认是4个MapTask

1.7.5 Sqoop一天导入多少数据

100万日活=》10万订单,1人10条,每天1g左右业务数据

Sqoop每天将1G的数据量导入到数仓。

1.7.6 Sqoop数据导出的时候一次执行多长时间

每天晚上00:30开始执行,Sqoop任务一般情况40 -50分钟的都有取决于数据量(11:116:18等活动在1个小时左右)

1.7.7 Sqoop在导入数据的时候数据倾斜

https://blog.csdn.net/lizhiguo18/article/details/103969906

Sqoop 抽数的并行化主要涉及到两个参数:num-mappers:启动N个map来并行导入数据,默认4个;split-by:按照某一列来切分表的工作单元。

通过ROWNUM() 生成一个严格均匀分布的字段,然后指定为分割字段

1.7.8 Sqoop数据导出Parquet(项目中遇到的问题

Ads层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式

(1)创建临时表,把Parquet中表数据导入到临时表,把临时表导出到目标表用于可视化

(2)Sqoop里面有参数,可以直接把Parquet转换为text

(3)ads层建表的时候就不要建Parquet表

1.8 Azkaban 

1.8.1 每天集群运行多少指标?

每天跑100多个指标,有活动时跑200个左右。

1.8.2 任务挂了怎么办?

运行成功或者失败都会发邮件、发钉钉、集成自动打电话(项目中遇到的问题

最主要的解决方案就是重新跑。

 

 

1.9 HBase

1.9.1 HBase存储结构

大数据技术之高频面试题

 

1.9.2 RowKey设计原则

1rowkey长度原则

2rowkey散列原则

3rowkey唯一原则

1.9.3 RowKey如何设计

1)生成随机数、hash、散列值

2)字符串反转

1.10 Scala

1.10.1 开发环境

要求掌握必要的scala开发环境搭建技能。

1.10.2 变量和数据类型

掌握var和val的区别

掌握数值类型(Byte、Short、Int、Long、Float、Double、Char)之间的转换关系

1.10.3 流程控制

掌握if-else、for、while等必要的流程控制结构,掌握如何实现break、continue的功能。

1.10.4 函数式编程

掌握高阶函数、匿名函数、函数柯里化、函数参数以及函数至简原则。

1.10.5 面向对象

掌握Scala与Java继承方面的区别、单例对象(伴生对象)、特质的用法及功能。

1.10.6 集合

掌握常用集合的使用、集合常用的计算函数。

1.10.7 模式匹配

掌握模式匹配的用法

1.10.8 异常

掌握异常常用操作即可

1.10.9 隐式转换

掌握隐式方法、隐式参数、隐式类,以及隐式解析机制

1.10.10 泛型

掌握泛型语法

 

1.11 Spark Core & SQL

1.11.1 Spark有几种部署方式?请分别简要论述

1)Local:运行在一台机器上,通常是练手或者测试环境。

2)Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。

3)Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

4)Mesos:国内大环境比较少用。

1.11.2 Spark任务使用什么进行提交,JavaEE界面还是脚本

Shell 脚本。

1.11.3 Spark提交作业参数(重点)

参考答案:

https://blog.csdn.net/gamer_gyt/article/details/79135118

1)在提交任务时的几个重要参数

executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个

num-executors —— 启动executors的数量,默认为2

executor-memory —— executor内存大小,默认1G

driver-cores —— driver使用内核数,默认为1

driver-memory —— driver内存大小,默认512M

2)边给一个提交任务的样式

spark-submit \

  --master local[5]  \

  --driver-cores 2   \

  --driver-memory 8g \

  --executor-cores 4 \

  --num-executors 10 \

  --executor-memory 8g \

  --class PackageName.ClassName XXXX.jar \

  --name "Spark Job Name" \

  InputPath      \

  OutputPath

1.11.4 简述Spark的架构与作业提交流程(画图讲解,注明各个部分的作用)(重点)

 

大数据技术之高频面试题

 

大数据技术之高频面试题

 

 

1.11.5 如何理解Spark中的血统概念(RDD)(笔试重点)

RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性以及划分任务时候起到重要作用。

1.11.6 简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数? (笔试重点)

Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。

1.11.7 请列举Spark的transformation算子(不少于8个),并简述功能(重点)

1)map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成.

2)mapPartitions(func):类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

3)reduceByKey(func,[numTask]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

 5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):

对相同K,把V合并成一个集合。

1.createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

2.mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

3.mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

根据自身情况选择比较熟悉的算子加以介绍。

1.11.8 请列举Spark的action算子(不少于6个),并简述功能(重点)

1)reduce:

2)collect:

3)first:

4)take:

5)aggregate:

6)countByKey:

7)foreach:

8)saveAsTextFile:

1.11.9 请列举会引起Shuffle过程的Spark算子,并简述功能。

reduceBykey:

groupByKey:

…ByKey:

1.11.10 简述Spark的两种核心Shuffle(HashShuffle与SortShuffle)的工作流程(包括未优化的HashShuffle、优化的HashShuffle、普通的SortShuffle与bypass的SortShuffle)(重点)

未经优化的HashShuffle:

大数据技术之高频面试题

 

优化后的Shuffle:

大数据技术之高频面试题

 

普通的SortShuffle:

大数据技术之高频面试题

当 shuffle read task 的 数 量 小 于 等 于 spark.shuffle.sort

bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。

大数据技术之高频面试题

 

1.11.11 Spark常用算子reduceByKey与groupByKey的区别,哪一种更具优势?(重点)

reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。

groupByKey:按照key进行分组,直接进行shuffle。

开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

1.11.12 Repartition和Coalesce关系与区别

1关系:

两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)

2区别:

repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle

一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce

1.11.13 分别简述Spark中的缓存机制(cache和persist)与checkpoint机制,并指出两者的区别与联系

都是做RDD持久化的

cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。

checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。

1.11.14 简述Spark*享变量(广播变量和累加器)的基本原理与用途。(重点)

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。

共享变量出现的原因:

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。

Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

1.11.15 当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?

使用foreachPartition代替foreach,在foreachPartition获取数据库的连接。

1.11.16 如何使用Spark实现TopN的获取(描述思路或使用伪代码)(重点)

方法1:

(1)按照key对数据进行聚合(groupByKey)

(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。

方法2:

(1)取出所有的key

(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序

方法3:

(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区

(2)对每个分区运用spark的排序算子进行排序

1.11.17 京东:调优之前与调优之后性能的详细对比(例如调整map个数,map个数之前多少、之后多少,有什么提升)

这里举个例子。比如我们有几百个文件,会有几百个map出现,读取之后进行join操作,会非常的慢。这个时候我们可以进行coalesce操作,比如240个map,我们合成60个map,也就是窄依赖。这样再shuffle,过程产生的文件数会大大减少。提高join的时间性能。

1.11.18 简述SparkSQL中RDD、DataFrame、DataSet三者的区别与联系? (笔试重点)

1RDD

优点:

编译时类型安全 

编译时就能检查出类型错误

面向对象的编程风格 

直接通过类名点的方式来操作数据

缺点:

序列化和反序列化的性能开销 

无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化

GC的性能开销,频繁的创建和销毁对象, 势必会增加GC

2DataFrame

DataFrame引入了schema和off-heap

schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。

3DataSet

DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。

当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。

三者之间的转换:

大数据技术之高频面试题

 

1.11.19 append和overwrite的区别

 append在原有分区上进行追加,overwrite在原有分区上进行全量刷新

1.11.20 coalesce和repartition的区别

 coalesce和repartition都用于改变分区,coalesce用于缩小分区且不会进行shuffle,repartition用于增大分区(提供并行度)会进行shuffle,在spark中减少文件个数会使用coalesce来减少分区来到这个目的。但是如果数据量过大,分区数过少会出现OOM所以coalesce缩小分区个数也需合理

1.11.21 cache缓存级别

DataFrame的cache默认采用 MEMORY_AND_DISK 这和RDD 的默认方式不一样RDD cache 默认采用MEMORY_ONLY

1.11.22 释放缓存和缓存

 缓存:(1)dataFrame.cache  (2)sparkSession.catalog.cacheTable(“tableName”)

 释放缓存:(1)dataFrame.unpersist  (2)sparkSession.catalog.uncacheTable(“tableName”)

1.11.23 Spark Shuffle默认并行度

参数spark.sql.shuffle.partitions 决定 默认并行度200

1.11.24 kryo序列化

kryo序列化比java序列化更快更紧凑,但spark默认的序列化是java序列化并不是spark序列化,因为spark并不支持所有序列化类型,而且每次使用都必须进行注册。注册只针对于RDD。在DataFrames和DataSet当中自动实现了kryo序列化。

1.11.25 创建临时表和全局临时表

DataFrame.createTempView() 创建普通临时表

DataFrame.createGlobalTempView()   DataFrame.createOrReplaceTempView() 创建全局临时表

1.11.26 BroadCast join 广播join

原理:先将小表数据查询出来聚合到driver端,再广播到各个executor端,使表与表join时

进行本地join,避免进行网络传输产生shuffle。

使用场景:大表join小表 只能广播小表

1.11.27 控制Spark reduce缓存 调优shuffle

spark.reducer.maxSizeInFilght  此参数为reduce task能够拉取多少数据量的一个参数默认48MB,当集群资源足够时,增大此参数可减少reduce拉取数据量的次数,从而达到优化shuffle的效果,一般调大为96MB,资源够大可继续往上跳。

 

spark.shuffle.file.buffer  此参数为每个shuffle文件输出流的内存缓冲区大小,调大此参数可以减少在创建shuffle文件时进行磁盘搜索和系统调用的次数,默认参数为32k 一般调大为64k。

1.11.28 注册UDF函数

SparkSession.udf.register 方法进行注册

1.11.29 SparkSQL中join操作与left join操作的区别?

join和sql中的inner join操作很相似,返回结果是前面一个集合和后面一个集合中匹配成功的,过滤掉关联不上的。

leftJoin类似于SQL中的左外关联left outer join,返回结果以第一个RDD为主,关联不上的记录为空。

部分场景下可以使用left semi join替代left join:

因为 left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,性能更高,而 left join 则会一直遍历。但是left semi join 中最后 select 的结果中只许出现左表中的列名,因为右表只有 join key 参与关联计算了

1.11.30 sparkSQL写的sql是如何转化为rdd的?

1. 语法解析,生成未解析的逻辑计划

2. 得到解析后的逻辑计划,得到多条执行路径

3.优化器选择一条最优的逻辑计划

4.把逻辑转化为物理计划,翻译为算子

5.运行的spark底层,得到job,生成dag,切分stage,划分taskSet

1.12 Spark Streaming

1.12.1 Spark Streaming第一次运行不丢失数据

kafka参数 auto.offset.reset 参数设置成earliest 从最初始偏移量开始消费数据

1.12.2 Spark Streaming精准一次消费

  1. 手动维护偏移量
  2. 处理完业务数据后,再进行提交偏移量操作

极端情况下,如在提交偏移量时断网或停电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事物保证精准一次消费

1.12.3 Spark Streaming控制每秒消费数据的速度

通过spark.streaming.kafka.maxRatePerPartition参数来设置Spark Streaming从kafka分区每秒拉取的条数

1.12.4 Spark Streaming背压机制

把spark.streaming.backpressure.enabled 参数设置为ture,开启背压机制后Spark Streaming会根据延迟动态去kafka消费数据,上限由spark.streaming.kafka.maxRatePerPartition参数控制,所以两个参数一般会一起使用

1.12.5 Spark Streaming 一个stage耗时

Spark Streaming stage耗时由最慢的task决定,所以数据倾斜时某个task运行慢会导致整个Spark Streaming都运行非常慢。

1.12.6 Spark Streaming 优雅关闭

把spark.streaming.stopGracefullyOnShutdown参数设置成ture,Spark会在JVM关闭时正常关闭StreamingContext,而不是立马关闭

Kill 命令:yarn application -kill     后面跟 applicationid

1.12.7 Spark Streaming 默认分区个数

Spark Streaming默认分区个数与所对接的kafka topic分区个数一致,Spark Streaming里一般不会使用repartition算子增大分区,因为repartition会进行shuffle增加耗时

1.12.8 SparkStreaming有哪几种方式消费Kafka中的数据,它们之间的区别是什么? 

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

优点如下: 

简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

一次且仅一次的事务机制

、对比:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

在实际生产环境中大都用Direct方式

1.12.9 简述SparkStreaming窗口函数的原理(重点)

 窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。

图中time1就是SparkStreaming计算批次大小,虚线框以及实线大框就是窗口的大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。

1.13 数据倾斜

公司一:总用户量1000万,5台64G内存的服务器。

公司二:总用户量10亿,1000台64G内存的服务器。

1.公司一的数据分析师在做join的时候发生了数据倾斜,会导致有几百万用户的相关数据集中到了一台服务器上,几百万的用户数据,说大也不大,正常字段量的数据的话64G还是能轻松处理掉的。

2.公司二的数据分析师在做join的时候也发生了数据倾斜,可能会有1个亿的用户相关数据集中到了一台机器上了(相信我,这很常见)。这时候一台机器就很难搞定了,最后会很难算出结果。

1.13.1 数据倾斜表现

1)hadoop中的数据倾斜表现:

  1. 有一个多几个Reduce卡住,卡在99.99%,一直不能结束。
  2. 各种container报错OOM
  3. 异常的Reducer读写的数据量极大,至少远远超过其它正常的Reducer
  4. 伴随着数据倾斜,会出现任务被kill等各种诡异的表现。

2)hive中数据倾斜

一般都发生在Sql中group byjoin on上,而且和数据逻辑绑定比较深。

3)Spark中的数据倾斜

Spark中的数据倾斜,包括Spark Streaming和Spark Sql,表现主要有下面几种:

  1. Executor lost,OOM,Shuffle过程出错;
  2. Driver OOM;
  3. 单个Executor执行时间特别久,整体任务卡在某个阶段不能结束;
  4. 正常运行的任务突然失败;

1.13.2 数据倾斜产生原因

我们以Spark和Hive的使用场景为例。

他们在做数据运算的时候会涉及到,count distinctgroup byjoin on等操作,这些都会触发Shuffle动作。一旦触发Shuffle,所有相同key的值就会被拉到一个或几个Reducer节点上,容易发生单点计算问题,导致数据倾斜。

一般来说,数据倾斜原因有以下几方面:

1)key分布不均匀;

大数据技术之高频面试题

 

2)建表时考虑不周

我们举一个例子,就说数据默认值的设计吧,假设我们有两张表:

user(用户信息表):userid,register_ip

ip(IP表):ip,register_user_cnt

这可能是两个不同的人开发的数据表。如果我们的数据规范不太完善的话,会出现一种情况:

user表中的register_ip字段,如果获取不到这个信息,我们默认为null;

但是在ip表中,我们在统计这个值的时候,为了方便,我们把获取不到ip的用户,统一认为他们的ip为0。

两边其实都没有错的,但是一旦我们做关联了,这个任务会在做关联的阶段,也就是sql的on的阶段卡死。

3)业务数据激增

比如订单场景,我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。

然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。

1.13.3 解决数据倾斜思路

很多数据倾斜的问题,都可以用和平台无关的方式解决,比如更好的数据预处理异常值的过滤等。因此,解决数据倾斜的重点在于对数据设计和业务的理解,这两个搞清楚了,数据倾斜就解决了大部分了。

1)业务逻辑

我们从业务逻辑的层面上来优化数据倾斜,比如上面的两个城市做推广活动导致那两个城市数据量激增的例子,我们可以单独对这两个城市来做count,单独做时可用两次MR,第一次打散计算,第二次再最终聚合计算。完成后和其它城市做整合。

2)程序层面

比如说在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个Reduce任务。

我们可以先group by,再在外面包一层count,就可以了。比如计算按用户名去重后的总用户量:

(1)优化前 只有一个reduce,先去重再count负担比较大:

select name,count(distinct name)from user;

(2)优化后

// 设置该任务的每个job的reducer个数为3个。Hive默认-1,自动推断。

set mapred.reduce.tasks=3;

// 启动两个job,一个负责子查询(可以有多个reduce),另一个负责count(1):

select count(1) from (select name from user group by name) tmp;

3)调参方面

Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。

4)从业务和数据上解决数据倾斜

很多数据倾斜都是在数据的使用上造成的。我们举几个场景,并分别给出它们的解决方案。

  1. 有损的方法:找到异常数据,比如ip为0的数据,过滤掉
  2. 无损的方法:对分布不均匀的数据,单独计算
  3. 先对key做一层hash,先将数据随机打散让它的并行度变大,再汇集
  4. 数据预处理

1.13.4 定位导致数据倾斜代码

Spark数据倾斜只会发生在shuffle过程中。

这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinctgroupByKeyreduceByKeyaggregateByKeyjoincogrouprepartition等。

出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

1.13.4.1 某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中:

如果是用yarn-client模式提交,那么在提交的机器本地是直接可以看到log,可以在log中找到当前运行到了第几个stage;

如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。

此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

看task运行时间和数据量

task运行时间

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。

task数据量

此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

推断倾斜代码

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。

精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以如下单词计数来举例。

val conf = new SparkConf()val sc = new SparkContext(conf)val lines = sc.textFile("hdfs://...")val words = lines.flatMap(_.split(" "))val pairs = words.map((_, 1))val wordCounts = pairs.reduceByKey(_ + _)wordCounts.collect().foreach(println(_))

在整个代码中只有一个reduceByKey是会发生shuffle的算子,也就是说这个算子为界限划分出了前后两个stage:

stage0,主要是执行从textFilemap操作,以及shuffle write操作(对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内)。

stage1,主要是执行从reduceByKeycollect操作,以及stage1的各个task一开始运行,就会首先执行shuffle read操作(会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加)

stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。

123456789

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。

比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中,定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是是该算子导致了数据倾斜问题。

此时,如果某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

1.13.4.2 某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

1.13.5 查看导致数据倾斜的key分布情况

先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)

val sampledWordCounts = sampledPairs.countByKey()

sampledWordCounts.foreach(println(_))

1.13.6 Spark 数据倾斜的解决方案

1.13.6.1 使用Hive ETL预处理数据

1.13.6.1.1 适用场景

导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

1.13.6.1.2 实现思路

此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

1.13.6.1.3 方案实现原理

这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

1.13.6.1.4 方案优缺点

优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

1.13.6.1.5 方案实践经验

在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

1.13.6.1.6 项目实践经验

在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。

1.13.6.2 过滤少数导致倾斜的key

1.13.6.2.1 方案适用场景

如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

1.13.6.2.2 方案实现思路

如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。

比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。

如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

1.13.6.2.3 方案实现原理

将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

1.13.6.2.4 方案优缺点

优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

1.13.6.2.5 方案实践经验

在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

1.13.6.3 提高shuffle操作的并行度

1.13.6.3.1 方案适用场景

如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

1.13.6.3.2 方案实现思路

在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,默认是200,对于很多场景来说都有点过小。

1.13.6.3.3 方案实现原理

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。

而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

大数据技术之高频面试题

 

1.13.6.3.4 方案优缺点

优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

1.13.6.3.5 方案实践经验

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

1.13.6.4 两阶段聚合(局部聚合+全局聚合)

1.13.6.4.1 方案适用场景

对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

1.13.6.4.2 方案实现思

这个方案的核心实现思路就是进行两阶段聚合:

第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。

接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。

然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

示例代码如下:

// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });
  
// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
  
// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });
  
// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

1.13.6.4.3 方案实现原理

将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

大数据技术之高频面试题

 

1.13.6.4.4 方案优缺点

优点
对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

缺点
仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

1.13.6.5 将reduce join转为map join

1.13.6.5.1 方案适用场景

在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

1.13.6.5.2 方案实现思路

不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量,广播给其他Executor节点;

接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

示例如下:

// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
  
// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到可以join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });
  
// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

1.13.6.5.3 方案实现原理

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。

但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

大数据技术之高频面试题

 

1.13.6.5.4 方案优缺点

优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

1.13.6.6 采样倾斜key并分拆join操作

1.13.6.6.1 方案适用场景

两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。

如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

1.13.6.6.2 方案实现思路

对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。

然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀;

而不会导致倾斜的大部分key形成另外一个RDD。

接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀;

不会导致倾斜的大部分key也形成另外一个RDD。

再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。

而另外两个普通的RDD就照常join即可。

最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

示例如下:

// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
  
// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。

// 每行数据变为<key,1>
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }
        });
        
// 按key累加行数
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
        
// 反转key和value,变为<value,key>
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });

// 以行数排序key,取最多行数的key
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
  
// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
        
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            }
        });
  
// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }
              
        });
 
// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });
 
// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
 
// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

1.13.6.6.3 方案实现原理

对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

大数据技术之高频面试题

 

1.13.6.6.4 方案优缺点

优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

1.13.6.7 使用随机前缀和扩容RDD进行join

1.13.6.7.1 方案适用场景

如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

1.13.6.7.2 方案实现思路

该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。

然后将该RDD的每条数据都打上一个n以内的随机前缀。

同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。

最后将两个处理后的RDD进行join即可。

示例代码如下:

// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });
  
// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });
  
// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

1.13.6.7.3 方案实现原理

将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。

该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;

而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

1.13.6.7.4 方案优缺点

优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

1.13.6.7.5 方案实践经验

曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

1.13.6.8 多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。

比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一HiveETL预处理和过滤少数导致倾斜的k,预处理一部分数据,并过滤一部分数据来缓解;

其次可以对某些shuffle操作提升并行度,优化其性能;

最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。

大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

1.13.7 Spark数据倾斜处理小结

大数据技术之高频面试题

 

 

1.14 Flink基础

1.14.1 简单介绍一下 Flink

Flink 是一个框架和分布式处理引擎,用于对*和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink提供了诸多高抽象层的API以便用户编写分布式任务:

DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。

DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。

Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

此外,Flink 还针对特定的应用领域提供了领域库,例如: Flink ML,Flink 的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。 Gelly,Flink 的图计算库,提供了图计算的相关API及多种图计算算法实现。根据官网的介绍,Flink 的特性包含:

1.14.2 Flink相比传统的Spark Streaming区别?

这个问题是一个非常宏观的问题,因为两个框架的不同点非常之多。但是在面试时有非常重要的一点一定要回答出来:Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型。

下面我们就分几个方面介绍两个框架的主要区别:

1. 架构模型Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。

2. 任务调度Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。

3. 时间机制Spark Streaming 支持的时间机制有限,只支持处理时间。 Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。

4. 容错机制对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink 则使用两阶段提交协议来解决这个问题。

1.14.3 Flink的组件栈有哪些?

根据 Flink 官网描述,Flink 是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。

 大数据技术之高频面试题

自下而上,每一层分别代表:Deploy 层:该层主要涉及了Flink的部署模式,在上图中我们可以看出,Flink 支持包括local、Standalone、Cluster、Cloud等多种部署模式。Runtime 层:Runtime层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。API层:API 层主要实现了面向流(Stream)处理和批(Batch)处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API,后续版本,Flink有计划将DataStream和DataSet API进行统一。Libraries层:该层称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。

1.14.4 Flink 的运行必须依赖 Hadoop组件吗?

Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。

1.14.5 你们的Flink集群规模多大?

大家注意,这个问题看起来是问你实际应用中的Flink集群规模,其实还隐藏着另一个问题:Flink可以支持多少节点的集群规模?在回答这个问题时候,可以将自己生产环节中的集群规模、节点、内存情况说明,同时说明部署模式(一般是Flink on Yarn),除此之外,用户也可以同时在小集群(少于5个节点)和拥有 TB 级别状态的上千个节点上运行 Flink 任务。

1.14.6 Flink的基础编程模型了解吗?

大数据技术之高频面试题

 

上图是来自Flink官网的运行流程图。通过上图我们可以得知,Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个Sink接收器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。执行时,Flink程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。

1.14.7 Flink集群有哪些角色?各自有什么作用?

大数据技术之高频面试题

 

Flink 程序在运行时主要有 TaskManager,JobManager,Client三种角色。其中JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理Flink集群中从节点TaskManager。TaskManager是实际负责执行计算的Worker,在其上执行Flink Job的一组Task,每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。

1.14.8 说说 Flink 资源管理中 Task Slot 的概念

大数据技术之高频面试题

 

在Flink架构角色中我们提到,TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。

1.14.9 说说 Flink 的常用算子?

Flink 最常用的常用算子包括:Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。Filter:过滤掉指定条件的数据。KeyBy:按照指定的key进行分组。Reduce:用来进行结果汇总合并。Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)

1.14.10 说说你知道的Flink分区策略?

什么要搞懂什么是分区策略。分区策略是用来决定数据如何发送至下游。目前 Flink 支持了8中分区策略的实现。

 

上图是整个Flink实现的分区策略继承图:GlobalPartitioner 数据会被分发到下游算子的第一个实例中进行处理。ShufflePartitioner 数据会被随机分发到下游算子的每一个实例中进行处理。RebalancePartitioner 数据会被循环发送到下游的每一个实例中进行处理。RescalePartitioner 这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为A和B。下游并行度为4,编号为1,2,3,4。那么A则把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游并行度为4,编号为A,B,C,D。下游并行度为2,编号为1,2。那么A和B则把数据发送给1,C和D则把数据发送给2。BroadcastPartitioner 广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。ForwardPartitioner ForwardPartitioner 用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据的控制台打印。KeyGroupStreamPartitioner Hash分区器。会将数据按 Key 的 Hash 值输出到下游算子实例中。CustomPartitionerWrapper 用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。例如:

static classCustomPartitionerimplementsPartitioner<String> {
      @Override
      publicintpartition(String key, int numPartitions) {
          switch (key){
              case "1":
                  return 1;
              case "2":
                  return 2;
              case "3":
                  return 3;
              default:
                  return 4;
          }
      }
  }

1.14.11 Flink的并行度了解吗?Flink的并行度设置是怎样的?

Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度:

操作算子层面(Operator Level)

执行环境层面(Execution Environment Level)

客户端层面(Client Level)

系统层面(System Level)

需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。

1.14.12 Flink的Slot和parallelism有什么区别?

官网上十分经典的图:

大数据技术之高频面试题

 

slot是指taskmanager的并发执行能力,假设我们将 taskmanager.numberOfTaskSlots 配置为3 那么每一个 taskmanager 中分配3个 TaskSlot, 3个 taskmanager 一共有9个TaskSlot。

 

大数据技术之高频面试题

parallelism是指taskmanager实际使用的并发能力。假设我们把 parallelism.default 设置为1,那么9个 TaskSlot 只能用1个,有8个空闲。

1.14.13 Flink有没有重启策略?说说有哪几种?

Flink 实现了多种重启策略。

固定延迟重启策略(Fixed Delay Restart Strategy)

故障率重启策略(Failure Rate Restart Strategy)

没有重启策略(No Restart Strategy)

Fallback重启策略(Fallback Restart Strategy)

1.14.14 用过Flink中的分布式缓存吗?如何使用?

Flink实现的分布式缓存和Hadoop有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点中,防止task重复拉取。

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

1.14.15 说说Flink中的广播变量,使用时需要注意什么?

我们知道Flink是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。那么Flink中的广播变量就是为了解决这种情况。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。

1.14.16 说说Flink中的窗口?

来一张官网经典的图:

大数据技术之高频面试题

 

Flink 支持两种划分窗口的方式,按照time和count。如果根据时间划分窗口,那么它就是一个time-window 如果根据数据划分窗口,那么它就是一个count-window。flink支持窗口的两个重要属性(size和interval)如果size=interval,那么就会形成tumbling-window(无重叠数据) 如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size< interval, 那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。通过组合可以得出四种基本窗口:

time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))

time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))

count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)

count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

1.14.17 说说Flink中的状态存储?

Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。

1.14.18 Flink中的时间有哪几类

Flink 中的时间和其他流式计算系统的时间一样分为三类:事件时间,摄入时间,处理时间三种。如果以 EventTime 为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime。如果以 IngesingtTime 为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的systemTime为准。如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的systemTime 为准。

1.14.19 Flink 中水印是什么概念,起到什么作用?

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。

1.14.20 Flink Table & SQL 熟悉吗?TableEnvironment这个类有什么作用

TableEnvironment是Table API和SQL集成的核心概念。这个类主要用来:

在内部catalog中注册表

注册外部catalog

执行SQL查询

注册用户定义(标量,表或聚合)函数

将DataStream或DataSet转换为表

持有对ExecutionEnvironment或StreamExecutionEnvironment的引用

1.14.21 Flink SQL的实现原理是什么?是如何实现 SQL 解析的呢?

首先大家要知道 Flink 的SQL解析是基于Apache Calcite这个开源框架。

大数据技术之高频面试题

 

基于此,一次完整的SQL解析过程如下:

用户使用对外提供Stream SQL的语法开发业务应用

用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划

采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划

对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行

1.15 Flink中级

1.15.1 Flink是如何支持批流一体的?

大数据技术之高频面试题

 

本道面试题考察的其实就是一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了DataSet API 和 DataStream API。

1.15.2 Flink是如何做到高效的数据交换的?

在一个Flink Job中,数据需要在不同的task中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲buffer中收集records,然后再发送。Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。

1.15.3 Flink是如何做容错的?

Flink 实现容错主要靠强大的CheckPoint机制和State机制。Checkpoint 负责定时制作分布式快照、对程序中的状态进行备份;State 用来存储计算过程中的中间状态。

1.15.4 Flink 分布式快照的原理是什么?

Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。

大数据技术之高频面试题

 

核心思想是在 input source 端插入 barrier,控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。

1.15.5 Flink是如何保证Exactly-once语义的?

Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。 分为以下几个步骤:

开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面

预提交(preCommit)将内存中缓存的数据写入文件并关闭

正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟

丢弃(abort)丢弃临时文件

若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。

1.15.6 Flink 的 kafka 连接器有什么特别的地方?

Flink源码中有一个独立的connector模块,所有的其他connector都依赖于此模块,Flink 在1.9版本发布的全新kafka连接器,摒弃了之前连接不同版本的kafka集群需要依赖不同版本的connector这种做法,只需要依赖一个connector即可。

1.15.7 说说 Flink的内存管理是如何做的?

Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。理论上Flink的内存管理分为三部分:

Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改

Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。

User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。

1.15.8 说说 Flink的序列化如何做的?

Java本身自带的序列化和反序列化的功能,但是辅助信息占用空间比较大,在序列化对象时记录了过多的类信息。Apache Flink摒弃了Java原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。TypeInformation 支持以下几种类型:

BasicTypeInfo: 任意Java 基本类型或 String 类型

BasicArrayTypeInfo: 任意Java基本类型数组或 String 数组

WritableTypeInfo: 任意 Hadoop Writable 接口的实现类

TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现

CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)

PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法

GenericTypeInfo: 任意无法匹配之前几种类型的类

针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。

1.15.9 Flink中的Window出现了数据倾斜,你有什么解决办法?

window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:

在数据进入窗口前做预聚合

重新设计窗口聚合的key

1.15.10 Flink中在使用聚合函数 GroupBy、Distinct、KeyBy 等函数时出现数据热点该如何解决?

数据倾斜和数据热点是所有大数据框架绕不过去的问题。处理这类问题主要从3个方面入手:

在业务上规避这类问题

例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。

Key的设计上

把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。

参数设置

Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。

1.15.11 Flink任务延迟高,想解决这个问题,你会如何入手?

在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。

1.15.12 Flink是如何处理反压的?

Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。

1.15.13 Flink的反压和Strom有哪些不同?

Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到 Zookeeper ,Zookeeper 上的 watch 会通知该拓扑的所有 Worker 都进入反压状态,最后 Spout 停止发送 tuple。Flink中的反压使用了高效有界的分布式阻塞队列,下游消费变慢会导致发送端阻塞。二者最大的区别是Flink是逐级反压,而Storm是直接从源头降速。

1.15.14 Operator Chains(算子链)这个概念你了解吗?

为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。

1.15.15 Flink什么情况下才会把Operator chain在一起形成算子链?

两个operator chain在一起的的条件:

上下游的并行度一致

下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)

上下游节点都在同一个 slot group 中(下面会解释 slot group)

下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)

上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

两个节点间数据分区方式是 forward(参考理解数据流的分区)

用户没有禁用 chain

1.15.16 说说Flink1.9的新特性?

支持hive读写,支持UDF

Flink SQL TopN和GroupBy等优化

Checkpoint跟savepoint针对实际业务场景做了优化

Flink state查询

1.15.17 消费kafka数据的时候,如何处理脏数据?

可以在处理前加一个fliter算子,将不符合规则的数据过滤出去。

1.16 Flink高级

1.16.1 Flink Job的提交流程 

用户提交的Flink Job会被转化成一个DAG任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交互是基于Akka工具包的,是通过消息驱动。整个Flink Job的提交还包含着ActorSystem的创建,JobManager的启动,TaskManager的启动和注册。

1.16.2 Flink所谓"三层图"结构是哪几个"图"?

一个Flink任务的DAG生成计算图大致经历以下三个过程:

StreamGraph 最接近代码所表达的逻辑层面的计算拓扑结构,按照用户代码的执行顺序向StreamExecutionEnvironment添加StreamTransformation构成流式图。

JobGraph 从StreamGraph生成,将可以串联合并的节点进行合并,设置节点之间的边,安排资源共享slot槽位和放置相关联的节点,上传任务所需的文件,设置检查点配置等。相当于经过部分初始化和优化处理的任务图。

ExecutionGraph 由JobGraph转换而来,包含了任务具体执行所需的内容,是最贴近底层实现的执行图。

1.16.3 JobManger在集群中扮演了什么角色?

JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。

1.16.4 JobManger在集群启动过程中起到什么作用?

JobManager的职责主要是接收Flink作业,调度Task,收集作业状态和管理TaskManager。它包含一个Actor,并且做如下操作:

RegisterTaskManager: 它由想要注册到JobManager的TaskManager发送。注册成功会通过AcknowledgeRegistration消息进行Ack。

SubmitJob: 由提交作业到系统的Client发送。提交的信息是JobGraph形式的作业描述信息。

CancelJob: 请求取消指定id的作业。成功会返回CancellationSuccess,否则返回CancellationFailure。

UpdateTaskExecutionState: 由TaskManager发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回true,否则返回false。

RequestNextInputSplit: TaskManager上的Task请求下一个输入split,成功则返回NextInputSplit,否则返回null。

JobStatusChanged: 它意味着作业的状态(RUNNING, CANCELING, FINISHED,等)发生变化。这个消息由ExecutionGraph发送。

1.16.5 TaskManager在集群中扮演了什么角色?

TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。

1.16.6 TaskManager在集群启动过程中起到什么作用?

TaskManager的启动流程较为简单: 启动类:org.apache.flink.runtime.taskmanager.TaskManager 核心启动方法 : selectNetworkInterfaceAndRunTaskManager 启动后直接向JobManager注册自己,注册完成后,进行部分模块的初始化。

1.16.7 Flink 计算资源的调度是如何实现的?

TaskManager中最细粒度的资源是Task slot,代表了一个固定大小的资源子集,每个TaskManager会将其所占有的资源平分给它的slot。

通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。

而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输,也能共享一些数据结构,一定程度上减少了每个task的消耗。 每个slot可以接受单个task,也可以接受多个连续task组成的pipeline,如下图所示,FlatMap函数占用一个taskslot,而key Agg函数和sink函数共用一个taskslot:

大数据技术之高频面试题

 

1.16.8 简述Flink的数据抽象及数据交换过程?

Flink 为了避免JVM的固有缺陷例如java对象存储密度低,FGC影响吞吐和响应等,实现了自主管理内存。MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。对接从Java对象转为Buffer的中间对象是另一个抽象StreamRecord。

1.16.9 Flink 中的分布式快照机制是如何实现的?

Flink的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。 这些快照充当一致性checkpoint,系统可以在发生故障时回滚。 Flink用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。 它受到分布式快照的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。

大数据技术之高频面试题

 

barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。 一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。

1.16.10 简单说说FlinkSQL的是如何实现的?

Flink 将 SQL 校验、SQL 解析以及 SQL 优化交给了Apache Calcite。Calcite 在其他很多开源项目里也都应用到了,譬如 Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架构中处于核心的地位,如下图所示。

大数据技术之高频面试题

 

构建抽象语法树的事情交给了 Calcite 去做。SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的抽象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的抽象语法树。然后依次被转换成逻辑执行计划和物理执行计划。在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。

 

2章 项目架构

2.1 提高自信

云上数据仓库解决方案:https://www.aliyun.com/solution/datavexpo/datawarehouse

 

大数据技术之高频面试题

 

大数据技术之高频面试题

2.2 数仓概念

数据仓库的输入数据源和输出系统分别是什么?

输入系统埋点产生用户行为数据、JavaEE后台产生的业务数据个别公司有爬虫数据。

输出系统:报表系统、用户画像系统、推荐系统

2.3 系统数据流程设计

大数据技术之高频面试题

 

2.4 框架版本选型

1Apache:运维麻烦,组件间兼容性需要自己调研。一般大厂使用,技术实力雄厚,有专业的运维人员

2CDH6.3.2:国内使用最多的版本,但 CM不开源,但其实对中、小公司使用来说没有影响(建议使用)10000美金一个节点   CDP7.0

3HDP:开源,可以进行二次开发,但是没有CDH稳定国内使用较少

大数据技术之高频面试题

 

2.5 服务器选型

服务器使用物理机还是云主机?

1)机器成本考虑:

(1)物理机:以128G内存20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,惠普品牌。一般物理机寿命5年左右

(2)云主机,以阿里云为例,差不多相同配置,每年5W

2)运维成本考虑:

(1)物理机:需要有专业的运维人员(1万*13个月)、电费(商业用户)、安装空调

(2)云主机:很多运维工作都由阿里云已经完成,运维相对较轻松

3)企业选择

(1)金融有钱公司和阿里没有直接冲突的公司选择阿里云(上海)

(2)中小公司、为了融资上市,选择阿里云,拉倒融资后买物理机。

(3)有长期打算,资金比较足,选择物理机。

2.6 集群规模

大数据技术之高频面试题

 

20核物理CPU  40线程  * 7 = 280线程

内存128g * 7= 896g

128m =》1g内存    =》87g数据 、700g内存

根据数据规模大家集群

1

2

3

4

5

6

7

8

9

10

nn

nn

dn

dn

dn

dn

dn

dn

dn

dn

 

 

rm

rm

nm

nm

nm

nm

nm

nm

 

 

nm

nm

 

 

 

 

 

 

 

 

 

 

 

 

 

zk

zk

zk

 

 

 

 

 

 

 

kafka

kafka

kafka

 

 

 

 

 

 

 

Flume

Flume

flume

 

 

Hbase

Hbase

Hbase

 

 

 

 

 

hive

hive

 

 

 

 

 

 

 

 

mysql

mysql

 

 

 

 

 

 

 

 

spark

spark

 

 

 

 

 

 

 

 

 

 

 

 

 

ES

ES

 

 

 

2.7 人员配置参考

2.7.1 整体架构

属于研发/技术部/数据部我们属于数据组其他还有后端项目组,前端组、测试组、UI组。其他的还有产品部、运营、人事部、财务部、行政部等。

大数据开发工程师=>大数据组组长=》项目经理=>部门经理=》技术总监CTO

2.7.2 你们部门的职级等级,晋升规则

职级分初级,中级,高级。晋升规则不一定,看公司效益和职位空缺。

京东:T1、T2应届生;T3 14k左右   T4 18K左右  T5  24k-28k左右

阿里:p5p6p7p8

2.7.3 人员配置参考

小型公司(3人左右):组长1人,剩余组员无明确分工,并且可能兼顾javaEE和前端。

中小型公司(3~6人左右):组长1人,离线2人左右,实时1人左右(离线一般多于实时),组长兼顾和javaEE、前端。

中型公司(5~10人左右):组长1人,离线3~5人左右(离线处理、数仓),实时2人左右,组长和技术大牛兼顾和javaEE、前端。

中大型公司(10~20人左右):组长1人,离线5~10人(离线处理、数仓),实时5人左右,JavaEE1人左右(负责对接JavaEE业务),前端1人(有或者没有人单独负责前端)。(发展比较良好的中大型公司可能大数据部门已经细化拆分,分成多个大数据组,分别负责不同业务)

上面只是参考配置,因为公司之间差异很大,例如ofo大数据部门只有5个人左右,因此根据所选公司规模确定一个合理范围,在面试前必须将这个人员配置考虑清楚,回答时要非常确定。

IOS多少人 安卓多少人 前端多少人  JavaEE多少人  测试多少人

(IOS、安卓) 1-2个人   前端1-3个人; JavaEE一般是大数据的1-1.5倍,测试:有的有,有的没有。1个左右。  产品经理1个、产品助理1-2个,运营1-3

公司划分:

0-50 小公司

50-500 中等

500-1000 大公司

1000以上 大厂 领军的存在

你写项目,肯定是有一家公司的,公司的基本信息你得了解清楚,包括(老板是谁,业务,盈利情况,规模,地点,通勤,部门,这个组多少人....),该如何查,百度,天眼查之类的公司....

说话的时候,一定非常自信(给人的感觉我做过,我能搞定),不要很冲,基本的礼仪,不要过度吹捧,最好的平等,不要胆怯,不要猥琐...