Spark SQL: Relational Data Processing in Spark(SparkSQL原理解密,希望对大家有所帮助)
Spark SQL: Relational Data Processing in Spark
Michael Armbrusty, Reynold S. Xiny, Cheng Liany, Yin Huaiy, Davies Liuy, Joseph K. Bradleyy,Xiangrui Mengy, Tomer Kaftanz, Michael J. Franklinyz, Ali Ghodsiy, Matei Zahariay_y
Databricks Inc. _MIT CSAIL zAMPLab, UC Berkeley
Philipse Guo(郭飞 翻译整理)
摘要:
Spark SQL是Apache Spark下一个新模块,它集成了关系型操作和函数编程API于一体,基于我们的Shark经验,Spark SQL不仅可以让Spark开发者利用关系型(数据)处理的优势(比如声明式的查询和最优化的存储等),还可以让SQL开发者调用Spark里面的复杂库类(比如机器学习等),与之前的系统相比,Spark SQL新增了两个主要功能,其一,关系型操作和程序性操作结合更加紧密了,通过声明式的DataFrame API可以很好的和Spark代码结合起来,其二,他提供了可高度拓展的优化器Catalyst,Catalyst利用Scala编程语言的特色,让新增规则,代码生成控制和自定义拓展功能操作更加容易了。有了Catalyst我们就多了很多新特性(比如JSON格式数据的结构推测,机器学习的类型和结合外部数据库组合查询等),即使是很复杂的数据分析需求也变得很得心应手。SparkSQL在保证Spark编程模型不被破坏的前提下提供更丰富的API和各种优化,这不仅仅是SQL-On-Spark的一次革命,同时也是Spark自身的一次升华。
目录和主题描述
H.2数据库管理系统:系统
关键字:
数据库;数据仓库;机器学习;Spark;Hadoop
一:简介
大数据的应用系统需要面对多种数据处理技术,各种数据源和五花八门的数据格式,最早期的系统为此提供一个功能强大的,但是低级的过程程序设计接口,比如MapReduce。像这样的程序略显笨重,需要用户手动调优才能达到一个比较好的性能。因此,各种新系统通过提供大数据下的关系型操作的接口争相给用户一种更好的体验,这些系统利用声明式的查询来提供更丰富的自动优化方案,比如Pig,Hive,Dremel和Shark。
尽管关系型系统的流行昭示着用户更喜欢写声明式的查询,但关系型系统的一些解决方案在面对大数据应用场景下也经常捉襟见肘,首先,用户想要通过自定义代码来ETL处理来自各种数据源的半结构化或非结构化的数据,其次,用户想要进行一些高级分析,比如机器学习和图计算,这些在关系型系统中想要实现非常困难,实际上,我们观察到大多数数据更适合在关系型查询和复杂的程序算法中一起处理,不幸的是,这两种系统目前结合的并不好,一般都是让用户去选择其一。
本篇论文描述了我们在SparkSQL中结合两种模型的努力,一个Apache Spark的一个子模块,SparkSQL是基于最早起的SQL-on-Spark,俗称Shark,为了不让用户选择使用关系型或者过程程序设计API,SparkSQL把二者组合了一起,真正做到的无缝衔接。
SparkSQL通过两个特征把这两种模型结合起来,首先是DataFrame API,它可以对外部的数据源和Spark程序中的数据进行关系型的操作,这和使用广泛的R语言的dataFrame概念类似,不过SparkSQL是延迟运算的模式,在运算之前会进行关系型操作的优化;其次,为了支持更多的数据源和大数据应用的各种场景的算法,SparkSQL引进的一种新型可拓展的优化器Catalyst.使得新增数据源、优化规则和不同领域的数据类型(比如机器学习)更加便利。
DataFrame API在Spark程序中集成了丰富的关系型操作和过程程序操作,DataFrame是一种结构化记录的数据集,这可以通过Spark程序API来操作,也可以通过支持更丰富的优化器的关系型API来使用。DataFrame可以通过Spark程序内的JAVA/Python对象直接创建,这使得在Spark程序中进行关系型操作成为可能。还有其他的Spark相关的组件,比如机器学习库,也可以正常创建和使用DataFrame。一般应用场景下,DataFrame比Spark程序API在易操作性和使用效率上都更胜一筹。例如,DataFrame使用SQL语句可以很轻松对数据进行多种聚合操作,过程中仅仅需要读取数据一次,传统的函数式API要实现这一功能要相对困难些。DataFrame可以自动把数据进行存储列式,而不是松散的JAVA/Python对象。最后DataFrame与已有的R和Python的设计API不同, Spark sql在DataFrame操作中拥有关系型优化器,Catalyst.
为了在SparkSQL中支持更多的数据源和分析场景,我们设计了一种可拓展的查询优化器Catalyst,Catalyst利用Scala编程语言的特色(例如如规则匹配)通过使用Turingcomplete language(见注解1)来构建各种组合规则,同时提供一套通用的框架来对树节点进行操作,比如语法分析,生成执行计划和运行时的代码,有了这个架构我们可以很轻松的新增数据源,包括类似JSON一样的半结构化的数据集,同时把优化的数据(前文有提过列式存储)推到下游,比如hbase;通过自定义函数和自定义特定领域的一些数据类型(如机器学习等)。众所周知,函数式编程语言比较适合开发编译器,因此,基于Scala的优化器开发也相对比较容易,有了Catalyst的SparkSQL如虎添翼,自发布之后,我们看到好多外部的贡献者也把Catalyst加到他们的项目中。
SparkSQL发布于2014年5月,目前也是Spark家族中较成熟和活跃的成员,日前,Apache Spark是大数据处理领域里最活跃的开源项目,仅去年就有400个contributors做过贡献,SparkSQL在超大数据规模环境中也已有应用场景,比如,一家大量互联网公司使用SparkSQL来处理数据流,在8000个节点的集群上进行查询,数据量超过100PB,正常每次操作有数十TB的数据,不仅如此,很多用户采用SparkSQL不仅仅是由于它的SQL查询,更是看中它可以在Spark程序中集成程序设计语言的特性,比如,2/3的Databricks Cloud客户使用Spark的同时也在程序中集成SparkSQL一起使用。在性能方面,我们发现SparkSQL在使用SQL on hadoop的关系型查询中有着不俗的表现,同样的功能用SparkSQL的速度比用Spark代码快10倍,而且内存使用更少。
一般情况我们认为SparkSQL是SparkCore API的一次重大革命,原始的Spark函数式编程API比较普通,程序自动优化的空间很小,Spark SQL在拓宽Spark受众的同时也在提升Spark在优化方面的短板,Spark社区内目前正在把SparkSQL集成到更多的SPARK API中,在机器学习领域,DataFrames利用一种创新的“ML pipeline” API已经成为标准的数据表达方式,我们希望把次推广到更多的领域中去,比如图计算和流。
本篇论文起于Spark的背景和SparkSQL的目标,第二章会对DataFrame API对进一步的讲解,第三章主要介绍Catalyst优化器,第四章,聊聊我们在Catalyst中新增的高级特性,第五章讲SparkSQL的应用,第六章重点会在Catalyst上我们做的一些外部探索,第七章,总结,第八章,涉及到相关工作
2.背景和目标
2.1 Spark概述
Spark是2010年发布的一个通用的集群计算引擎,它包括Scala, Java and Python的API,流处理的库类,图计算和机器学习功能,据我所知,它是目前使用最广泛的多语言集成API系统之一,与DryadLINQ比较相似,同时它也是大数据处理领域中最活跃的开源项目,截止2014年,我们总共有400名贡献者,被多家公司使用。
Spark和最近流行的其他系统比较类似,也是提供一种函数式的编程API,用户可以在Resilient Distributed Datasets (RDDs)中操作他们的数据集,每一个RDD都含有分布在不同集群节点上的JAVA和Python对象,RDDs可以通过算子(如map,filter,reduce等)来操作,编程语言中使用这些算子可以将RDD在节点之间进行传递,例如,下面的Scala代码可以计算一个文本文件中的行数,每行数据都是以“ERRIR”打头。
上面的代码创建了RDD,逐行读取Hdfs文件,然后使用filter算子得到新的RDD errors,然后在此基础上再进行计数
RDDs是容错的,通过RDDs的线性图可以恢复丢失的数据(通过重新运行算子的方式进行重建丢失的数据,如上就是filter算子),RDDs也可以缓存到内存中或写到磁盘里,为下一步的迭代操作做准备。
值得注意的是RDDs是一种懒运算模式,每个RDD代表一个操作数据集的逻辑计划,但是Spark只有等到特定的输出算子才会真正的开始计算,比如count算子,在此之前执行引擎可以进行一些简单的优化,比如管道流优化,以上面的代码为例,Spark会以管道的方式逐行读取HDFS的数据,然后应用到filter算子,最后计算行数,所以过程中没有必要对中间的Lines和errors进行物化存储,这其实很关键,因为引擎可能不知道RDDs中数据的结构(可以是任意的JAVA或者Python对象)或用户自定义函数的专有名词(可以是任意的代码语言)
2.2 Spark之前的关系型操作系统
Shark是我们在Spark上首次尝试使用关系型操作接口,Shark让Hive可以以Spark引擎进行计算,同时进行了一些关系型数据库操作系统的优化,比如列式存储,尽管Shark和Spark程序结合的也很好,但是还有三个主要的挑战,第一,Shark只能查询存储在Hive catalog中的数据,不能对Spark程序中的数据进行关系型查询(比如上面创建的errors)。第二,Spark程序调用Shark的唯一方式就是拼成一个SQL字符串,这在模块化程序中会带来了极大的不便,同时也更易出错。最后,Hive的优化器专用于MapReduce,不易拓展,如果想要新增一些机器学习的数据类型或支持更多的数据源就会变得很困难。
2.3SparkSQL的设计目标
根据Shark的经验,我们想在spark程序中的RDDs和更多的数据源都使用到关系型操作,我们来看下SparkSQL的设计目标:
- 不仅支持Spark程序中(原生的RDDs)的关系型处理,也支持外部的一些编程性良好的API的关系型处理
- 可以利用已有的关系型数据库管理系统的操作技巧优化程序的性能
- 易于新增数据源,包括半结构化和外部数据源,支持联合查询
- 可以拓展一些高级分析算法,比如图计算和机器学习
3.编程接口
Spark sql作为Spark中的一个库来使用(见下图Figure1),它暴露的SQL接口,可以通过JDBC/ODBC或者命令行提供给外界去访问,也可以通过集成到Spark程序中的DataFrame API来访问,我们从DataFrame API开始支持,这样用户可以把程序代码和关系型代码放在一起使用,不过,高级函数本身也可以通过自定义函数的方式对外以SQL暴露出去,用户可以通过BI工具来使用,3.7节我们会讨论自定义函数
3.1DataFrame API
SparkSQL API最抽象的部分就是DataFrame(见注解)了,这是一种带有相同schema的分布式数据行集合,DataFrame可以理解为关系型数据库中的表概念,它的操作也类似于操作Spark原生的分布式数据集合(RDDs),与RDDs不同的是,DataFrame可以记录数据的schema信息,并且还支持多种关系型操作,后期可以做更多的优化。
DataFrames可以通过数据库中表(如外部的数据库)来创建,也可以通过JAVA/Python代码中原生的RDDs来创建(见3.5),DataFrames被创建好后就可以通过多种关系型操作算子进行操作,比如where,groupby,这些都是用DSL(domain-specific language)进行表达的,与R,python里面的data frames比较像,每个DataFrames可以当成是行对象RDD,DataFrames允许用户来调用程序设计Spark APIs,比如map
下面我们来聊聊DataFrame API.的细节部分。
3.2数据模型
SparkSQL的Tables和DataFrames缘于一种基于hive的嵌套模型,SparkSQL支持目前主流的数据类型,包括boolean, integer, double, decimal, string, date, and timestamp和一些复杂数据类型,如structs, arrays, maps and unions.复杂数据类型之间可以再进行嵌套组合成更强大的数据类型,与很多传统的关系型数据库管理系统不同,SparkSQL在关系型查询语句和API操作复杂数据类型时表现非常卓越,SparkSQL还支持用户自定义数据类型,章节4.4.2会提及到。
通过DataFrames,我们可以从多种数据源和各种数据格式中生成精确的目标模型,包括Hive,关系型数据库,JSON和原生的Java/python/Scala产生的原生对象。
3.3DataFrame算子
用户可以使用DSL在DataFrames上进行关系型操作,DataFrames支持所有一般关系型操作的算子,包括projection(select),filter(where),join和aggregations (groupBy).这些算子在DSL都相应的表达式,从而使得Spark可以捕获到这些表达式的结构,例如下面代码计算每个部门女性的人数
在这里,employees就是一个DataFrames,employees("deptId")就是一个描述deptid列的一个表达式,表达式对象通过不同的算子产生新的表达式,包括常见的比较算子(===表示相等,)表示大于,还有一些算术运行符+,-等),同时DataFrames还支持聚合,比如(count("name")),所有这些算子都会生成一个抽象语法树(AST)表达式,然后表达式会传给Catalyst做下一步的优化,这和Spark API生成的原生代码不同,原生代码在运行的时候是不可见的,希望了解API详情的同学可以参考Spark官方文档。
除了可以使用关系型DSL外,DataFrames可以通过关系型数据库中的表注册为临时表,然后用户可以通过SQL进行查询,下面就是一个例子。
SQL有时候在单纯的多聚合计算场景下是比较方面的,并且还可以通过JDBC/ODBC对外提供这些数据,注册好的DataFrames没有做物化存储,所以从SQL到原始的DataFrame表达式可以进行很多优化,值得一提的是,DataFrame也可以做物化存储,3.6节会提到这个
3.4DataFrames对比关系型查询语言
从表面上看,DataFrames提供了和关系型查询(hive,pig)语句一样的操作,但是经过社区的集成后,DataFrames变得更友好易用,比如,用户可以把他们的代码转化为Scala, Java 或Python函数,然后传递DataFrames,生成一个逻辑执行计划,在执行输出操作时,整个执行计划都可以进行优化,同样道理,开发者可以使用程序控制语言,比如if语句和循环语句来完成他们的工作,有个用户曾经说过,DataFrame API的简单易用和声明式的操作和SQL很像,就是不能给中间生成的结果数据进行命名(郭飞解:反讽的说法),意思是这种整体计算和debug操作是有多么的透明和易掌握
DataFrames,为了简化编程,前置了API分析逻辑计划(可以提前知道这个表达式中使用的列是不是在已有的表中,数据类型是否合适),尽管查询结果是延迟计算的,Spark SQL还是可以当用户输入无效的代码立刻把错误报出来,而不需要等到代码执行的时候才发现,与大的关系型SQL语句查询对比,效果会更好。
3.5查询本地数据集
真实世界中的管道流数据来自四面八方,算法也不尽相同,为了更好的和Spark程序代码进行集成,Spark SQL允许用户可以直接使用各种编程语言写的RDD直接进行构建DataFrames,它可以通过反射拿到这些对应的schema信息,在Scala和Java语言中,相关的反射类信息是从语言的类库中获得(从JavaBeans and Scala的类库),Python,语言由于动态类的特性,Spark SQL只需抽样部分反射schema信息的数据集就可以了
例如,下面的Scala代码就是利用一个USER对象的RDD定义了一个DataFrame。
一般来说,Spark SQL创建了一个指向RDD的逻辑数据扫描算子,这个算子进一步转化为物理操作算子,然后就可以直接访问代码中的原生对象了,值得注意的是这和传统的ORM(object-relational mapping)有很大的不同,ORMs在把整个对象转换为另外一个格式时需要付出很大的代价,相反,Spark SQL可以直接访问原生的对象,而且每次查询的时候仅仅需要抽取指定的字段就可以了。
Spark SQL能够直接访问原生的数据集,这使得用户可以在Spark代码中进行关系型算子的优化,不仅如此,它还可以把轻易地集成RDDs和外部的数据集,例如,我们可以把Hive中的一个表和USER对象的RDD进行Join操作
3.6 内存缓存
和之前的Shark一样,Spark SQL可以在内存中列式物化(也可以叫缓存)热数据,与Spark原生的缓存相比,Spark SQL物化就像把数据存在JVM对象里一样简单,列式存储使用列式压缩编码比如字典编码和运行时长编码,这大大减少了内存的占用。缓存在在交互式的查询和机器学习里面的迭代算法运行时显得尤为重要,可以通过调用DataFrame.的cache()
3.7自定义函数
自定义函数(UDFS)对于数据库系统来说一直以来都是一种重要的拓展,例如,Mysql以来UDFs来处理JSON数据,Postgres and other database systems也有通过MADLib的UDFs来实现机器学习算法,不过数据库系统经常需要再单独的程序环中去定义这些UDFs,和主要的查询接口是分离开的,Spark SQL的DataFrame API支持行内定义UDFs,不需要额外的打包或注册到外部的数据库系统中,这也是我们采用它的重要因素。
Spark SQL中, UDFs可以通过Scala,Java 或者Python的函数进行注册,也可以使用Spark自带的一些API来注册,比方说,给定一个机器学习的模型,我们可以这样注册为UDF
一旦注册成功,这个UDF就可以用BI工具的JDBC/ODBC接口来进行访问,除了上面这个操作数值的UDF外,我们还可以通过一个名称来操作整张表,像MADLib一样,同时还可以集成Spark API一起使用,这样SQL开发者就可以使用到高级分析函数了,最后,由于UDF的定义和查询执行都是使用一种主要语言(Scala或者Python),用户可以使用标准工具来debug整个或设计整个程序。
上面的例子演示了多个管道流下的一种通用使用场景,比如,用户既可以使用关系型算子,也可以使用高级分析方法来满足各种的需求,这些如果通过SQL实现都比较困难,DataFrame API可以让用户使用SparkSql时做到无缝衔接
4. Catalyst优化器
为了实现SparkSQL,我们设计了基于Scala函数编程架构一种新型可拓展的优化器Catalyst, Catalyst的拓展设计主要基于两种考虑,其一,我们想要快速的为SparkSQL新增优化技术和新的特性,这样我们可以处理大数据下的各种大场景(比如半结构化的数据和高级分析场景),其次,我们想要外部的开发者们也一起参与到开发优化器的过程中来,例如,通过新增数据源的特殊规则可以把算子运算好的数据落到外部存储介质中去,或者可以生成新的数据类型供后续使用,Catalyst支持基于规则的优化和基于代价的优化
尽管在过去拓展优化器经常被提到,不过这些一般情况下倾向于使用复杂的DSL去实现这个规则,然后编译优化器把规则解释为可执行的代码,这无疑延长的学习曲线,也增加了维护的压力,与此不同的是,Catalyst使用Scala编程语言的标准特性,比如规则匹配,这使得开发者全程可以使用编程语言,同事实现规则也变得比较容易,函数式编程语言适合做编译器,因此我们发现Scala就是这个工作的不二人选,不仅如此,据我所知,Catalyst是第一个被广泛用于生产环境基于Scala语言的优化器。
Catalyst的核心包含一系列用于描述树结构并且操作这些树的库,除了这个架构外,我们还创建特殊的库来处理关系型查询(比如,表达式,逻辑查询计划等),与此同时我们也新增了一系列规则集合,主要用于处理查询执行的不同的阶段:分析阶段,逻辑计划优化阶段,物理计划生成阶段和编译查询代码为Java字节码阶段,对于生成代码这个阶段,我们使用Scala的另外一个特性,关键字识别(quasiquotes),这可以使各种组合表达式中生成运行时代码这一阶段就变得很容易了,最后,Catalyst还提供几个公共拓展的功能点,包括拓展外部数据源和自定义对象等。
--未完待续...
小郭飞飞刀注解1:Turingcomplete language
a programing language is called "Turing complete", if that it can run any program(irrespective of the language) that a Turing machine can run given enough time and memory.
附SparkSQL英文原文地址:
http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf