hive: 自定义UDTF

在hive项目中, 有client和server通信的log日志体系如下,中间的网络传输使用的json格式,所以在server端接收时需要使用阿里的fastJSON来解析为日志聚合体LogAgg类;然后使用自定义表生成函数UDTF, 分别取出Error表, Event表,StartUp表的数据, 数据转储到各表中,完成日志的初步ETL。
hive: 自定义UDTF
所以,这个UDTF如何定义?
类似于上篇的自定义UDF函数, 这里也需要继承一个hive类 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF, 并实现三个方法即可(该类的描述如下:为单个输入行生成可变数量的输出行, 类似于explode(array)…)

自定义UDTF:函数的结构

hive: 自定义UDTF
hive: 自定义UDTF
初始化判断: initialize方法

类型判断 ois[0].getCategory() != ObjectInspector.Category.PRIMITIVE (((PrimitiveObjectInspector) ois[0])).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT、STRING、LONG
参数–(映射)–转换器 准备转换器数组: converters = new ObjectInspectorConverters.Converter[4] ObjectInspectorConverters.getConverter(ois[0], PrimitiveObjectInspectorFactory.javaIntObjectInspector、 javaLongObjectInspector、 javaStringObjectInspector );

在hive中被调用的方法:process, 解析参数生成新的数据

解析参数 返回结果
String servertimestr = (String) converters[0].convert(args[0]) forward(arr)

自定义UDTF:伪代码

public abstract class BaseForkUDTF<T> extends GenericUDTF {  //initialize(),  process(),  close()
	//属性	
	private Class<T> clazz;
	private ObjectInspectorConverters.Converter[] converters ;
	List<String> fieldNames =   new ArrayList<String>() ; //字段名称列表	
	List<ObjectInspector> ois = new ArrayList<ObjectInspector>()  ;//检查器列表	

	//构造
	//通过构造函数抽取子类的泛型化超类部分
	public BaseForkUDTF(){
		ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
		clazz = (Class) type.getActualTypeArguments()[0];
	}

	//重写方法1
 	StructObjectInspector initialize(ObjectInspector[] args){
		//判断: 参数类型
	  	 //参数 <--->参数转换器
		BeanInfo bi = Introspector.getBeanInfo(clazz) ;
		PropertyDescriptor[] pps = bi.getPropertyDescriptors();//得到所有属性

		for(PropertyDescriptor pp :pps){
			//获取属性名,  get !=null, set !=null 											if(type == Long.class || type == long.class){
					fieldNames.add(name) ;
					ois.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector) ;
				}
				else if(type == int.class || type ==Integer.class){
					fieldNames.add(name);
					ois.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
				}
				else if(type == String.class){
					fieldNames.add(name);											ois.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
				}
		}
		return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames , ois) ;	
	}
	
	//重写方法2:		
	void process(Object[] args) {
		//用转换器:取出参数的值	
		String servertimestr = (String) converters[0].convert(args[0]);
		Long clienttimems = (Long) converters[1].convert(args[1]);
		//取出String clientip ,  String json
		
		//日志分析
		json = json.replace("\\\"" , "\"") ;
		AppLogAggEntity agg = JSONObject.parseObject(json , AppLogAggEntity.class) ;//消息传递: obj-->json-->obj
		alignTime(agg , servertimestr , clienttimems) ;// 对齐: client时间(时区:    服务器时间utc-----client时间cst)
		//===================定义回调函数:  空实现=================================
		extraProcess(clientip) ;//解析ip:  ==>china, 山东, A市

		//.=================== 日志叉分--->还原为error, event, startup.....===================
		//定义abstract函数:  子类有各自的实现----(error:  从agg中取出错误日志...)
		List<T> logs = getLogs(agg) ;
		for(Object log : logs){
			Object[] arr = new Object[fieldNames.size()] ;
			int i = 0 ;

			//此对象有几个字段====>取出: 对应的属性值
			for(String fname : fieldNames){
				PropertyDescriptor pp = new PropertyDescriptor(fname , clazz) ;
				Method get = pp.getReadMethod() ;
				if(get != null){
					Object retValue = get.invoke(log) ;
					arr[i] = retValue ;
				}
				i ++ ;
			}
			//转发对象,就是输出一行
			forward(arr);
		}		
	}//end process()
}