pig 笔记
Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig
Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口。
pig关系操作符:
关系名上面有个student
还需要dump g;
产生结果 :one is age,another is a bag;
对多个关系进行分组,合并之后的分组。
----------------------------------------------------------------------------------------------------------------------------------------------------------
所有命令和脚本都在Pig 0.12.0 & Hadoop 2.2.0下测试通过。
准备两个数据文件:
1)student.txt 结构为(班级号,学号,成绩),字段间逗号分隔。
C01,N0101,82
C01,N0102,59
C01,N0103,65
C02,N0201,81
C02,N0202,82
C02,N0203,79
C03,N0301,56
C03,N0302,92
C03,N0306,72
2)teacher.txt 结构为(班级号,教师),字段间逗号分隔。
C01,Zhang
C02,Sun
C03,Wang
C04,Dong
加载和存储(Load,Store)
执行以下命令
records = load'hdfs://localhost:9000/input/student.txt' using PigStorage(',') as(classNo:chararray, studNo:chararray, score:int);
dump records;
store records into ' hdfs://localhost:9000/input/student_out' using PigStorage(':');
然后查看hdfs://localhost:9000/input/student_out目录下的part-m-00000文件,其内容如下:
C01:N0101:82
C01:N0102:59
C01:N0103:65
C02:N0201:81
C02:N0202:82
C02:N0203:79
C03:N0301:56
C03:N0302:92
C03:N0306:72
其中的load是加载操作,store是存储操作。他们分别可以指定其分隔符,比如上例中的逗号和分号。
筛选(Filter)
执行以下命令:
records_c01 = filter records byclassNo=='C01';
dump records_c01;
结果如下:
(C01,N0101,82)
(C01,N0102,59)
(C01,N0103,65)
注意:判断是否相等要用两个等号。
Foreach Generate
Foreach对关系中的每一个记录循环,然后按指定模式生成一个新的关系。
执行以下命令:
score_c01 = foreach records_c01generate 'Teacher',$1,score;
dump score_c01;
结果如下:
(Teacher,N0101,82)
(Teacher,N0102,59)
(Teacher,N0103,65)
生成的新的关系中包括三个字段,第一个字段是常量,第二个字段是学号(我们是通过索引号引用的),第三个字段是分数(我们通过字段名引用的)。
分组(group)
执行以下命令:
grouped_records = group recordsby classNo parallel 2;
dump grouped_records;
结果如下:
(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)})
(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)})
(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)})
其中的Paraller 2表示启用2个Reduce操作。
如何统计每个班级及格和优秀的学生人数呢?执行以下两个命令:
result = foreach grouped_records {
fail =filter records by score < 60;
excellent =filter records by score >=90;
generategroup, COUNT(fail) as fail, COUNT(excellent) as excellent;
};
dump result;
结果如下:
(C01,1,0)
(C02,0,0)
(C03,1,1)
题外话:
flatten操作,可以将数据格式扁平化。我们分别通过tuple和bag来看看flatten的作用:
1) Flatten对tuple的作用
执行以下命令:
a= foreach records generate $0,($1,$2);
dumpa;
输出结果如下:
(C01,(N0101,82))
(C01,(N0102,59))
(C01,(N0103,65))
(C02,(N0201,81))
(C02,(N0202,82))
(C02,(N0203,79))
(C03,(N0301,56))
(C03,(N0302,92))
(C03,(N0306,72))
然后,执行:
b = foreach a generate $0,flatten($1);
dump b;
结果如下:
(C01,N0101,82)
(C01,N0102,59)
(C01,N0103,65)
(C02,N0201,81)
(C02,N0202,82)
(C02,N0203,79)
(C03,N0301,56)
(C03,N0302,92)
(C03,N0306,72)
由此看见,flatten作用于tuple时,将flatten对应的字段(tuple)中的字段扁平化为关系中的字段。(不知道该如何解释比较好)
2) Flatten对bag的作用
执行以下命令
c = foreach records generate $0,{($1),($1,$2)};
dump c;
结果如下:
(C01,{(N0101),(N0101,82)})
(C01,{(N0102),(N0102,59)})
(C01,{(N0103),(N0103,65)})
(C02,{(N0201),(N0201,81)})
(C02,{(N0202),(N0202,82)})
(C02,{(N0203),(N0203,79)})
(C03,{(N0301),(N0301,56)})
(C03,{(N0302),(N0302,92)})
(C03,{(N0306),(N0306,72)})
接下来执行:
d = foreach c generate $0,flatten($1);
dump d;
结果如下:
(C01,N0101)
(C01,N0101,82)
(C01,N0102)
(C01,N0102,59)
(C01,N0103)
(C01,N0103,65)
(C02,N0201)
(C02,N0201,81)
(C02,N0202)
(C02,N0202,82)
(C02,N0203)
(C02,N0203,79)
(C03,N0301)
(C03,N0301,56)
(C03,N0302)
(C03,N0302,92)
(C03,N0306)
(C03,N0306,72)
可以看出,flatten作用于bag时,会消除嵌套关系,生成类似于笛卡尔乘积的结果。(不好表达,读者可以细细体会)。
Stream操作
可以将Python程序嵌入到Pig中使用。
建立一个Python文件pass.py,内容如下:
#! /usr/bin/envpython
import sys
for line insys.stdin:
(c,n,s) = line.split()
if int(s) >= 60:
print "%s\t%s\t%s"%(c,n,s)
执行以下命令:
define pass `pass.py` SHIP('/home/user/pass.py');
records_pass = stream records through pass as(classNo:chararray, studNo:chararray, score:int);
dump records_pass;
结果如下:
(C01,N0101,82)
(C01,N0103,65)
(C02,N0201,81)
(C02,N0202,82)
(C02,N0203,79)
(C03,N0302,92)
(C03,N0306,72)
可以看出,统计结果为所有及格的记录(>=60)。
其中,ship用于将python程序提交到Hadoop集群中去。
请注意第一个命令中的`pass.py`,不是用单引号括起来的,是用键盘1左边的那个键上的字符括起来的。(不知道这个字符怎么称呼,只知道是一种标注符号)
Join
先执行以下两条命令:
r_student = load'hdfs://localhost:9000/input/student.txt' using PigStorage(',') as (classNo:chararray, studNo: chararray, score: int);
r_teacher2 = load'hdfs://localhost:9000/input/teacher.txt' using PigStorage(',') as (classNo:chararray, teacher: chararray);
回到本文开头,我们有两个数据文件,分别为学生(班级,学号,成绩);老师(班级,姓名)。
执行以下命令:
r_joined = join r_student by classNo,r_teacher by classNo;
dump r_joined;
(C01,N0103,65,C01,Zhang)
(C01,N0102,59,C01,Zhang)
(C01,N0101,82,C01,Zhang)
(C02,N0203,79,C02,Sun)
(C02,N0202,82,C02,Sun)
(C02,N0201,81,C02,Sun)
(C03,N0306,72,C03,Wang)
(C03,N0302,92,C03,Wang)
(C03,N0301,56,C03,Wang)
类似于SQL中的内连接Inner Join。当然你也可以使用外连接,比如:
r_joined = join r_student by classNo left outer,r_teacher by classNo;
dump r_joined;
注意:left outer/right outer要写在第一个关系名的后面。以下语法是错误的:
r_joined = join r_student by classNo, r_teacher by classNo leftouter; //错误
COGROUP
Join的操作结果是平面的(一组元组),而COGROUP的结果是有嵌套结构的。
运行以下命令:
r1 = cogroup r_student by classNo,r_teacher by classNo;
dump r1;
结果如下:
(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)},{(C01,Zhang)})
(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)},{(C02,Sun)})
(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)},{(C03,Wang)})
(C04,{},{(C04,Dong)})
由结果可以看出:
1) cogroup和join操作类似。
2) 生成的关系有3个字段。第一个字段为连接字段;第二个字段是一个包,值为关系1中的满足匹配关系的所有元组;第三个字段也是一个包,值为关系2中的满足匹配关系的所有元组。
3) 类似于Join的外连接。比如结果中的第四个记录,第二个字段值为空包,因为关系1中没有满足条件的记录。实际上第一条语句和以下语句等同:
r1= cogroup r_student by classNo outer,r_teacher by classNo outer;
如果你希望关系1或2中没有匹配记录时不在结果中出现,则可以分别在关系中使用inner而关键字进行排除。
执行以下语句:
r1 = cogroup r_student by classNo inner,r_teacher byclassNo outer;
dump r1;
结果为:
(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)},{(C01,Zhang)})
(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)},{(C02,Sun)})
(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)},{(C03,Wang)})
如先前我们讲到的flatten,执行以下命令:
r2 = foreach r1 generate flatten($1),flatten($2);
dump r2;
结果如下:
(C01,N0103,65,C01,Zhang)
(C01,N0102,59,C01,Zhang)
(C01,N0101,82,C01,Zhang)
(C02,N0203,79,C02,Sun)
(C02,N0202,82,C02,Sun)
(C02,N0201,81,C02,Sun)
(C03,N0306,72,C03,Wang)
(C03,N0302,92,C03,Wang)
(C03,N0301,56,C03,Wang)
Cross
执行以下命令:
r = cross r_student,r_teacher;
dump r;
结果如下:
(C03,N0306,72,C04,Dong)
(C03,N0306,72,C03,Wang)
(C03,N0306,72,C02,Sun)
(C03,N0306,72,C01,Zhang)
(C03,N0302,92,C04,Dong)
(C03,N0302,92,C03,Wang)
(C03,N0302,92,C02,Sun)
(C03,N0302,92,C01,Zhang)
(C03,N0301,56,C04,Dong)
(C03,N0301,56,C03,Wang)
(C03,N0301,56,C02,Sun)
(C03,N0301,56,C01,Zhang)
(C02,N0203,79,C04,Dong)
(C02,N0203,79,C03,Wang)
(C02,N0203,79,C02,Sun)
(C02,N0203,79,C01,Zhang)
(C02,N0202,82,C04,Dong)
(C02,N0202,82,C03,Wang)
(C02,N0202,82,C02,Sun)
(C02,N0202,82,C01,Zhang)
(C02,N0201,81,C04,Dong)
(C02,N0201,81,C03,Wang)
(C02,N0201,81,C02,Sun)
(C02,N0201,81,C01,Zhang)
(C01,N0103,65,C04,Dong)
(C01,N0103,65,C03,Wang)
(C01,N0103,65,C02,Sun)
(C01,N0103,65,C01,Zhang)
(C01,N0102,59,C04,Dong)
(C01,N0102,59,C03,Wang)
(C01,N0102,59,C02,Sun)
(C01,N0102,59,C01,Zhang)
(C01,N0101,82,C04,Dong)
(C01,N0101,82,C03,Wang)
(C01,N0101,82,C02,Sun)
(C01,N0101,82,C01,Zhang)
由此可以看出,cross类似于笛卡尔乘积。一般情况下不建议直接使用cross,而应该事前对数据集进行筛选,提高效率。
排序(Order)
执行以下命令:
r = order r_student by score desc, classNo asc;
dump r;
结果如下:
(C03,N0302,92)
(C01,N0101,82)
(C02,N0202,82)
(C02,N0201,81)
(C02,N0203,79)
(C03,N0306,72)
(C01,N0103,65)
(C01,N0102,59)
(C03,N0301,56)
联合(Union)
执行以下语句:
r_union = union r_student, r_teacher;
dump r_union;
结果如下:
(C01,N0101,82)
(C01,N0102,59)
(C01,N0103,65)
(C02,N0201,81)
(C02,N0202,82)
(C02,N0203,79)
(C03,N0301,56)
(C03,N0302,92)
(C03,N0306,72)
(C01,Zhang)
(C02,Sun)
(C03,Wang)
(C04,Dong)
可以看出:
1) union是取两个记录集的并集。
2) 关系r_union的schema为未知(unknown),这是因为被union的两个关系的schema是不一样的。如果两个关系的schema是一致的,则union后的关系将和被union的关系的schema一致。
----------------------------------------------------------------------------------------------------------------------------------------------------------
从实例出发
%default file test.txt
A = load '$file' as (date, web, name, food);
B = load '$file' as (date, web, name, food);
C= cogroup A by $0, B by $1;
describe C;
illustrate C;
dump C;
cogroup命令中$0和$1,两个列的内容如果不一样,就是分别生成两个批次的group,先按A值分组,在按B对应的值分组。按A的值分组时,B对应的为空,则group中有一个空组{};但如果内容一样,如C= cogroup A by $1, B by $1;就是生成一个批次的group,其中包含A和B两个表中所有的等于该值的元组。
运行以下命令:
r1 = cogroup r_student by classNo,r_teacher by classNo;
dump r1;
结果如下:
(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)},{(C01,Zhang)})
(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)},{(C02,Sun)})
(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)},{(C03,Wang)})
(C04,{},{(C04,Dong)})
由结果可以看出:
1) cogroup和join操作类似。
2) 生成的关系有3个字段。第一个字段为连接字段;第二个字段是一个包,值为关系1中的满足匹配关系的所有元组;第三个字段也是一个包,值为关系2中的满足匹配关系的所有元组。
3) 类似于Join的外连接。比如结果中的第四个记录,第二个字段值为空包,因为关系1中没有满足条件的记录。实际上第一条语句和以下语句等同:
r1= cogroup r_student by classNo outer,r_teacher by classNo outer;
如果你希望关系1或2中没有匹配记录时不在结果中出现,则可以分别在关系中使用inner而关键字进行排除。
执行以下语句:
r1 = cogroup r_student by classNo inner,r_teacher byclassNo outer;
dump r1;
结果为:
(C01,{(C01,N0103,65),(C01,N0102,59),(C01,N0101,82)},{(C01,Zhang)})
(C02,{(C02,N0203,79),(C02,N0202,82),(C02,N0201,81)},{(C02,Sun)})
(C03,{(C03,N0306,72),(C03,N0302,92),(C03,N0301,56)},{(C03,Wang)})
r2 = foreach r1 generate flatten($1),flatten($2);
dump r2;
结果如下:
(C01,N0103,65,C01,Zhang)
(C01,N0102,59,C01,Zhang)
(C01,N0101,82,C01,Zhang)
(C02,N0203,79,C02,Sun)
(C02,N0202,82,C02,Sun)
(C02,N0201,81,C02,Sun)
(C03,N0306,72,C03,Wang)
(C03,N0302,92,C03,Wang)
(C03,N0301,56,C03,Wang)
sample_data = limit industry_existed_Data 20;
--STORE sample_data INTO '/user/wizad/tmp/industry_existed_Data' USING PigStorage(',');
--merge with history data
cogroupIndustryExistCurrentByGuid = COGROUP industry_existed_Data by guid, industry_current_data by guid;
mydata = sample cogroupIndustryExistCurrentByGuid 0.1;
dump mydata;
describe cogroupIndustryExistCurrentByGuid;
--dump cogroupIndustryExistCurrentByGuid;
--STORE mycogroupdata INTO '/user/wizad/tmp/cogroupIndustryExistCurrentByGuid' USING PigStorage(',');
look_for_cogroup = FOREACH cogroupIndustryExistCurrentByGuid GENERATE $0,$2;
describe look_for_cogroup;
IndustryStorageDataTmp = FOREACH cogroupIndustryExistCurrentByGuid GENERATE FLATTEN($2);
IndustryStorageData = DISTINCT IndustryStorageDataTmp;
describe IndustryStorageData;
{
group: chararray,
industry_existed_Data:{industryId: chararray,guid: chararray,sex: chararray,log_type: chararray},
industry_current_data: {joined_ad_campaign_data::industryId: chararray,joined_Orgin_sex_data::distinct_origin_historical_sex::guid: chararray,joined_Orgin_sex_data::social_sex::sex: chararray,joined_Orgin_sex_data::distinct_origin_historical_sex::log_type: chararray}
}
look_for_cogroup:
{
group: chararray,
industry_current_data: {joined_ad_campaign_data::industryId: chararray,joined_Orgin_sex_data::distinct_origin_historical_sex::guid: chararray,joined_Orgin_sex_data::social_sex::sex: chararray,joined_Orgin_sex_data::distinct_origin_historical_sex::log_type: chararray}
}
IndustryStorageData:
{
industry_current_data::joined_ad_campaign_data::industryId: chararray,
industry_current_data::joined_Orgin_sex_data::distinct_origin_historical_sex::guid: chararray,
industry_current_data::joined_Orgin_sex_data::social_sex::sex: chararray,
industry_current_data::joined_Orgin_sex_data::distinct_origin_historical_sex::log_type: chararray
}
((a50a17bde79ac018,),{(74,863010025134441,a50a17bde79ac018,863010025134441,)})
((a51779f736cd3f54,),{(74,862949029595753,a51779f736cd3f54,862949029595753,)})
((c7ae5867-3b77-4987-b082-ed3867b5c384,),{(74,353627055387065,c7ae5867-3b77-4987-b082-ed3867b5c384,353627055387065,)})
新建了一个文件test.txt,随便造了几条测试数据(tab键分隔,是pig默认的分隔方式,若要以其他分隔,load的时候load 'test.txt' using PigStorage(',') as .... 就可以以逗号分隔):
xiaojun 28 上海
yangna 24 兰州
在pig grant shell中输入中文是没法解析的,会报错,比如在shell下面输入:
a = load 'test.txt' as (name:chararray,age:int,city:chararray);
b = filter a by city == '上海'; //此句执行会报错。
dump b;
如果需要使用到中文进行数据集的过滤,可以将语句写到一个单独的pig脚本中,例如我们新建一个test.pig文件,把上面两句写入到该文件中去,然后再执行pig -x local test.pig,这时可以成功的执行中文字符的过滤语句。
一般情况下,都会有定期运行的pig脚本,比如每天运行一次的脚本,这类脚本往往在内部都要用到当天的日期作为参数,pig支持参数替换,参数由前缀$字符来标示,例如我们需要load每天的数据进行统计分析,每天的数据在hdfs上是按日期进行进行命名的,我们可以新建一个test.pig脚本:
a = load '$input' as (.....);
store a into '$output';
然后我们在命令行可以这样:pig -param input=/user/tom/input/2014-12-01.dat -param output=/user/tom/output/2014-12-01.dat -f test.pig
test.pig中会把$标示符指定的参数替换为-param指定的参数值。
更常用的是通过动态参数来替换,即shell脚本中经常会使用到的反引号引用的命令。
我们可以把2014-12-01这段通过shell脚本动态输出:
pig -param input=/user/tom/input/`date "+%Y-%m-%d"`.dat -param output=/user/tom/output/`date "+%Y-%m-%d"`.dat -f test.pig
这样就实现了参数的完全动态替换。
假设文件test4.txt有这么两行数据:
1980080113312121212018
1985080113313131313023
规则是前8位为年月日,中间11位为手机号码,后3位表示的是年龄。
我们可以自定义一个加载udf来加载这个文件
将这两个类打成jar包,然后进入pig grunt,执行一下脚本
register besttonePigUDF.jar ;
a= load 'test4.txt' using com.besttone.pig.udf.load.CutLoadFunc('0-8,8-19,19-22') as (date:chararray,phone:chararray,age:chararray);
dump a;
可以看到内容被成功的加载:
(19800801,13312121212,018)
(19850801,13313131313,023)