hive: 自定义UDTF
在hive项目中, 有client和server通信的log日志体系如下,中间的网络传输使用的json格式,所以在server端接收时需要使用阿里的fastJSON来解析为日志聚合体LogAgg类;然后使用自定义表生成函数UDTF, 分别取出Error表, Event表,StartUp表的数据, 数据转储到各表中,完成日志的初步ETL。
所以,这个UDTF如何定义?
类似于上篇的自定义UDF函数, 这里也需要继承一个hive类 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF, 并实现三个方法即可(该类的描述如下:为单个输入行生成可变数量的输出行, 类似于explode(array)…)
自定义UDTF:函数的结构
初始化判断: initialize方法
类型判断 | ois[0].getCategory() != ObjectInspector.Category.PRIMITIVE | (((PrimitiveObjectInspector) ois[0])).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT、STRING、LONG |
---|---|---|
参数–(映射)–转换器 | 准备转换器数组: converters = new ObjectInspectorConverters.Converter[4] | ObjectInspectorConverters.getConverter(ois[0], PrimitiveObjectInspectorFactory.javaIntObjectInspector、 javaLongObjectInspector、 javaStringObjectInspector ); |
在hive中被调用的方法:process, 解析参数生成新的数据
解析参数 | 返回结果 |
---|---|
String servertimestr = (String) converters[0].convert(args[0]) | forward(arr) |
自定义UDTF:伪代码
public abstract class BaseForkUDTF<T> extends GenericUDTF { //initialize(), process(), close()
//属性
private Class<T> clazz;
private ObjectInspectorConverters.Converter[] converters ;
List<String> fieldNames = new ArrayList<String>() ; //字段名称列表
List<ObjectInspector> ois = new ArrayList<ObjectInspector>() ;//检查器列表
//构造
//通过构造函数抽取子类的泛型化超类部分
public BaseForkUDTF(){
ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
clazz = (Class) type.getActualTypeArguments()[0];
}
//重写方法1
StructObjectInspector initialize(ObjectInspector[] args){
//判断: 参数类型
//参数 <--->参数转换器
BeanInfo bi = Introspector.getBeanInfo(clazz) ;
PropertyDescriptor[] pps = bi.getPropertyDescriptors();//得到所有属性
for(PropertyDescriptor pp :pps){
//获取属性名, get !=null, set !=null if(type == Long.class || type == long.class){
fieldNames.add(name) ;
ois.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector) ;
}
else if(type == int.class || type ==Integer.class){
fieldNames.add(name);
ois.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
}
else if(type == String.class){
fieldNames.add(name); ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
}
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames , ois) ;
}
//重写方法2:
void process(Object[] args) {
//用转换器:取出参数的值
String servertimestr = (String) converters[0].convert(args[0]);
Long clienttimems = (Long) converters[1].convert(args[1]);
//取出String clientip , String json
//日志分析
json = json.replace("\\\"" , "\"") ;
AppLogAggEntity agg = JSONObject.parseObject(json , AppLogAggEntity.class) ;//消息传递: obj-->json-->obj
alignTime(agg , servertimestr , clienttimems) ;// 对齐: client时间(时区: 服务器时间utc-----client时间cst)
//===================定义回调函数: 空实现=================================
extraProcess(clientip) ;//解析ip: ==>china, 山东, A市
//.=================== 日志叉分--->还原为error, event, startup.....===================
//定义abstract函数: 子类有各自的实现----(error: 从agg中取出错误日志...)
List<T> logs = getLogs(agg) ;
for(Object log : logs){
Object[] arr = new Object[fieldNames.size()] ;
int i = 0 ;
//此对象有几个字段====>取出: 对应的属性值
for(String fname : fieldNames){
PropertyDescriptor pp = new PropertyDescriptor(fname , clazz) ;
Method get = pp.getReadMethod() ;
if(get != null){
Object retValue = get.invoke(log) ;
arr[i] = retValue ;
}
i ++ ;
}
//转发对象,就是输出一行
forward(arr);
}
}//end process()
}