Hive SQL编译原理

Hive是基于Hadoop的一个数据仓库系统,在各大公司都有广泛的应用。美团数据仓库也是基于Hive搭建,每天执行近万次的Hive ETL计算流程,负责每天数百GB的数据存储和分析。Hive的稳定性和性能对我们的数据分析非常关键。

Hive SQL的编译过程大概分为以下几个部分:

SQL--->AST--->Query Block--->执行操作树--->逻辑计划树(优化)--->物理执行计划(优化)

SQL词法语法解析

Antlr

Hive使用Antlr实现SQL的词法和语法解析。Antlr是一种语言识别的工具,可以用来构造领域语言
这里不详细介绍Antlr,只需要了解使用Antlr构造特定的语言只需要编写一个语法文件,定义词法和语法替换规则即可,Antlr完成了词法分析、语法分析、语义分析、中间代码生成的过程。

Hive中语法规则的定义文件在0.10版本以前是Hive.g一个文件,随着语法规则越来越复杂,由语法规则生成的Java解析类可能超过Java类文件的最大上限,0.11版本将Hive.g拆成了5个文件,词法规则HiveLexer.g和语法规则的4个文件SelectClauseParser.g,FromClauseParser.g,IdentifiersParser.g,HiveParser.g。

抽象语法树 AST Tree

经过词法和语法解析后,如果需要对表达式做进一步的处理,使用 Antlr 的抽象语法树语法Abstract Syntax Tree,在语法分析的同时将输入语句转换成抽象语法树,后续在遍历语法树时完成进一步的处理。

下面的一段语法是Hive SQL中SelectStatement的语法规则,从中可以看出,SelectStatement包含select, from, where, groupby, having, orderby等子句。

(在下面的语法规则中,箭头表示对于原语句的改写,改写后会加入一些特殊词标示特定语法,比如TOK_QUERY标示一个查询块

1
2
3
4
5
6
7
8
9
10
11
12
13
selectStatement
:
selectClause
fromClause
whereClause?
groupByClause?
havingClause?
orderByClause?
clusterByClause?
distributeByClause?
sortByClause?
limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause? distributeByClause? sortByClause? limitClause?));

样例SQL

为了详细说明SQL编译Map Reduce的过程,这里以一条简单的SQL为例,SQL中包含了一个子查询,最终将数据写入到一张表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
FROM
(
SELECT
p.datekey datekey,
p.userid userid,
c.clienttype
FROM
detail.usersequence_client c
JOIN fact.orderpayment p ON p.orderid = c.orderid
JOIN default.user du ON du.userid = p.userid
WHERE p.datekey = 20131118
) base
INSERT OVERWRITE TABLE test.customer_kpi
SELECT
base.datekey,
base.clienttype,
count(distinct base.userid) buyer_count
GROUP BY base.datekey, base.clienttype

SQL生成AST Tree

Antlr对Hive SQL解析的代码如下,HiveLexerX,HiveParser分别是Antlr对语法文件Hive.g编译后自动生成的词法解析和语法解析类,在这两个类中进行复杂的解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));    //词法解析,忽略关键词的大小写
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
if (ctx != null) {
ctx.setTokenRewriteStream(tokens);
}
HiveParser parser = new HiveParser(tokens); //语法解析
parser.setTreeAdaptor(adaptor);
HiveParser.statement_return r = null;
try {
r = parser.statement(); //转化为AST Tree
} catch (RecognitionException e) {
e.printStackTrace();
throw new ParseException(parser.errors);
}

最终生成的AST Tree如下图右侧(使用Antlr Works生成,Antlr Works是Antlr提供的编写语法文件的编辑器),图中只是展开了骨架的几个节点,没有完全展开。
子查询1/2,分别对应右侧第1/2两个部分。

高山仰止

这里注意一下内层子查询也会生成一个TOK_DESTINATION节点。请看上面SelectStatement的语法规则,这个节点是在语法改写中特意增加了的一个节点。原因是Hive中所有查询的数据均会保存在HDFS临时的文件中,无论是中间的子查询还是查询最终的结果,Insert语句最终会将数据写入表所在的HDFS目录下。

详细来看,将内存子查询的from子句展开后,得到如下AST Tree,每个表生成一个TOK_TABREF节点,Join条件生成一个“=”节点。其他SQL部分类似,不一一详述。

高山仰止

SQL基本组成单元 Query Block

AST Tree仍然非常复杂,不够结构化,不方便直接翻译为MapReduce程序,AST Tree转化为QueryBlock就是将SQL进一部抽象和结构化。

Query Block

QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询。

下图为Hive中QueryBlock相关对象的类图,解释图中几个重要的属性

  • QB#aliasToSubq(表示QB类的aliasToSubq属性)保存子查询的QB对象,aliasToSubq key值是子查询的别名
  • QB#qbp即QBParseInfo保存一个基本SQL单元中的给个操作部分的AST Tree结构,QBParseInfo#nameToDest这个HashMap保存查询单元的输出,key的形式是inclause-i(由于Hive支持Multi Insert语句,所以可能有多个输出),value是对应的ASTNode节点,即TOK_DESTINATION节点。类QBParseInfo其余HashMap属性分别保存输出和各个操作的ASTNode节点的对应关系。
  • QBParseInfo#JoinExpr保存TOK_JOIN节点。QB#QBJoinTree是对Join语法树的结构化。
  • QB#qbm保存每个输入表的元信息,比如表在HDFS上的路径,保存表数据的文件格式等。
  • QBExpr这个对象是为了表示Union操作。

高山仰止

AST Tree生成QueryBlock

AST Tree生成QueryBlock的过程是一个递归的过程,先序遍历AST Tree,遇到不同的Token节点,保存到相应的属性中,主要包含以下几个过程

  • TOK_QUERY => 创建QB对象,循环递归子节点
  • TOK_FROM => 将表名语法部分保存到QB对象的aliasToTabs等属性中
  • TOK_INSERT => 循环递归子节点
  • TOK_DESTINATION => 将输出目标的语法部分保存在QBParseInfo对象的nameToDest属性中
  • TOK_SELECT => 分别将查询表达式的语法部分保存在destToSelExpr、destToAggregationExprs、destToDistinctFuncExprs三个属性中
  • TOK_WHERE => 将Where部分的语法保存在QBParseInfo对象的destToWhereExpr属性中
    最终样例SQL生成两个QB对象,QB对象的关系如下,QB1是外层查询,QB2是子查询

逻辑操作符 Operator

Operator

Hive最终生成的MapReduce任务,Map阶段和Reduce阶段均由OperatorTree组成。逻辑操作符,就是在Map阶段或者Reduce阶段完成单一特定的操作。

基本的操作符包括TableScanOperator,SelectOperator,FilterOperator,JoinOperator,GroupByOperator,ReduceSinkOperator

从名字就能猜出各个操作符完成的功能,TableScanOperator从MapReduce框架的Map接口原始输入表的数据,控制扫描表的数据行数,标记是从原表中取数据。JoinOperator完成Join操作。FilterOperator完成过滤操作

ReduceSinkOperator将Map端的字段组合序列化为Reduce Key/value, Partition Key,只可能出现在Map阶段,同时也标志着Hive生成的MapReduce程序中Map阶段的结束。

Operator在Map Reduce阶段之间的数据传递都是一个流式的过程。每一个Operator对一行数据完成操作后之后将数据传递给childOperator计算。

Operator类的主要属性和方法如下

  • RowSchema表示Operator的输出字段
  • InputObjInspector outputObjInspector解析输入和输出字段
  • processOp接收父Operator传递的数据,forward将处理好的数据传递给子Operator处理
  • Hive每一行数据经过一个Operator处理之后,会对字段重新编号,colExprMap记录每个表达式经过当前Operator处理前后的名称对应关系,在下一个阶段逻辑优化阶段用来回溯字段名
  • 由于Hive的MapReduce程序是一个动态的程序,即不确定一个MapReduce Job会进行什么运算,可能是Join,也可能是GroupBy,所以Operator将所有运行时需要的参数保存在OperatorDesc中,OperatorDesc在提交任务前序列化到HDFS上,在MapReduce任务执行前从HDFS读取并反序列化。Map阶段OperatorTree在HDFS上的位置在Job.getConf(“hive.exec.plan”) + “/map.xml”

高山仰止

Query Block生成Operator Tree

QueryBlock生成Operator Tree就是遍历上一个过程中生成的QB和QBParseInfo对象的保存语法的属性,包含如下几个步骤:

  • QB#aliasToSubq => 有子查询,递归调用
  • QB#aliasToTabs => TableScanOperator
  • QBParseInfo#joinExpr => QBJoinTree => ReduceSinkOperator + JoinOperator
  • QBParseInfo#destToWhereExpr => FilterOperator
  • QBParseInfo#destToGroupby => ReduceSinkOperator + GroupByOperator
  • QBParseInfo#destToOrderby => ReduceSinkOperator + ExtractOperator
    由于Join/GroupBy/OrderBy均需要在Reduce阶段完成,所以在生成相应操作的Operator之前都会先生成一个ReduceSinkOperator,将字段组合并序列化为Reduce Key/value, Partition Key

接下来详细分析样例SQL生成OperatorTree的过程

先序遍历上一个阶段生成的QB对象

  1. 首先根据子QueryBlock QB2#aliasToTabs {du=dim.user, c=detail.usersequence_client, p=fact.orderpayment}生成TableScanOperator

    • TableScanOperator(“dim.user”) TS[0]
      TableScanOperator(“detail.usersequence_client”) TS[1] TableScanOperator(“fact.orderpayment”) TS[2]
  2. 先序遍历QBParseInfo#joinExpr生成QBJoinTree,类QBJoinTree也是一个树状结构,QBJoinTree保存左右表的ASTNode和这个查询的别名,最终生成的查询树如下

1
2
3
4
5
  base
/ \
p du
/ \
c p

前序遍历QBJoinTree,先生成detail.usersequence_client和fact.orderpayment的Join操作树

高山仰止

图中 TS=TableScanOperator RS=ReduceSinkOperator JOIN=JoinOperator

生成中间表与dim.user的Join操作树

高山仰止

根据QB2 QBParseInfo#destToWhereExpr 生成FilterOperator。此时QB2遍历完成。下图中SelectOperator在某些场景下会根据一些条件判断是否需要解析字段。

高山仰止

图中 FIL= FilterOperator SEL= SelectOperator

根据QB1的QBParseInfo#destToGroupby生成ReduceSinkOperator + GroupByOperator

高山仰止

图中 GBY= GroupByOperator
GBY[12]是HASH聚合,即在内存中通过Hash进行聚合运算

最终都解析完后,会生成一个FileSinkOperator,将数据写入HDFS.

高山仰止

图中FS=FileSinkOperator

逻辑层优化器

大部分逻辑层优化器通过变换OperatorTree,合并操作符,达到减少MapReduce Job,减少shuffle数据量的目的。

高山仰止

表格中①的优化器均是一个Job干尽可能多的事情/合并。②的都是减少shuffle数据量,甚至不做Reduce。

CorrelationOptimizer优化器非常复杂,都能利用查询中的相关性,合并有相关性的Job,参考 Hive Correlation Optimizer

对于样例SQL,有两个优化器对其进行优化。下面分别介绍这两个优化器的作用,并补充一个优化器ReduceSinkDeDuplication的作用

PredicatePushDown优化器

断言判断提前优化器将OperatorTree中的FilterOperator提前到TableScanOperator之后

高山仰止

NonBlockingOpDeDupProc优化器

NonBlockingOpDeDupProc优化器合并SEL-SEL 或者 FIL-FIL 为一个Operator

高山仰止

ReduceSinkDeDuplication优化器

ReduceSinkDeDuplication可以合并线性相连的两个RS。实际上CorrelationOptimizer是ReduceSinkDeDuplication的超集,能合并线性和非线性的操作RS,但是Hive先实现的ReduceSinkDeDuplication

譬如下面这条SQL语句

1
from (select key, value from src group by key, value) s select s.key group by s.key;

经过前面几个阶段之后,会生成如下的OperatorTree,两个Tree是相连的,这里没有画到一起

高山仰止

这时候遍历OperatorTree后能发现前前后两个RS输出的Key值和PartitionKey如下

高山仰止

ReduceSinkDeDuplication优化器检测到:

  • pRS Key完全包含cRS Key,且排序顺序一致;
  • pRS PartitionKey完全包含cRS PartitionKey。符合优化条件,会对执行计划进行优化。

ReduceSinkDeDuplication将childRS和parentheRS与childRS之间的Operator删掉,保留的RS的Key为key,value字段,PartitionKey为key字段。合并后的OperatorTree如下:

-------------本文结束感谢您的阅读-------------