Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive

1.1 系统内置函数

1.查看系统自带的函数

hive (default)> show functions;

2.显示自带的函数的用法

hive (default)> desc function upper;

3.详细显示自带的函数的用法

hive (default)> desc function extended upper;

Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive

1.2 自定义函数

1)Hive 自带了一些函数,比如:max/min 等,但是数量有限(大概二、三百个),自己可以通过自定义 UDF来方便的扩展。

2)当 Hive 提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。

3)根据用户自定义函数类别分为以下三种:

(1)UDF(User-Defined-Function)

一进一出

(2)UDAF(User-Defined Aggregation Function)

聚集函数,多进一出

类似于:count/max/min

(3)UDTF(User-Defined Table-Generating Functions)

一进多出

如 lateral view explore()

4)官方文档地址

https://cwiki.apache.org/confluence/display/Hive/HivePlugins

5)编程步骤:

(1)继承 org.apache.hadoop.hive.ql.UDF

(2)需要实现 evaluate 函数;evaluate 函数支持重载;

(3)在 hive 的命令行窗口创建函数

a)添加 jar add jar linux_jar_path

b)创建 function,

create [temporary] function [dbname.]function_name AS class_name;

(4)在 hive 的命令行窗口删除函数

Drop [temporary] function [if exists] [dbname.]function_name;

6)注意事项

 UDF 必须要有返回类型,可以返回 null,但是返回类型不能为 void;

1.3 自定义 UDF 函数

案例一:大写字母变成小写字母

1.创建一个 Maven 工程 Hive

2.导入依赖

<dependencies>

<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-exec</artifactId>

<version>2.1.0</version>

</dependency>

</dependencies>

3.创建一个类

package com.allen.hive;

import org.apache.hadoop.hive.ql.exec.UDF;

public class Lower extends UDF {

public String evaluate (final String s) {

if (s == null) {

return null;

}

return s.toLowerCase();

}

}

4.打成 jar 包上传到服务器/opt/jar/udf.jar

使用rz命令或者winscp等其他工具上传到你想上传的目录即可

Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive

5.将 jar 包添加到 hive 的 classpath

hive (default)> add jar /opt/jar/udf.jar;

6.创建临时函数与开发好的 java class 关联

hive (default)> create temporary function mylower as "com.allen.hive.Lower";

7.即可在 hql 中使用自定义的函数 strip

hive (default)> select ename, mylower(ename) lowername from emp;

Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive

下面的案例就不再一一截图了,提供一下代码,有兴趣的可以自己实践。

案例二:修改数据类型使之成为想要的类型

package com.allen.hive;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Locale;

 

import org.apache.hadoop.hive.ql.exec.UDF;

//1.定义一个类继承UDF,然后添加一个方法:evaluate,这个方法的参数和返回类型和函数的输入输出一致

//2.把项目打成jar包,然后放到hive的classPath下,或者在hive里面:add jar /opt/jar/myudf.jar

//3.在hive里面新建一个function然后指定到我们新建的类型:create function mydateparse as 'com.allen.hive.MyDateParser';

//4.使用方法:select mydateparser(time) from apache-log limit 10;

public class MyDataParser extends UDF{

     //hive自定义函数,继承UDF类之后,还需要定义一个

     //evaluate方法,这个方法的参数和hive函数接受的参数个数和数据类型一致

     //方法的返回值和hive函数的返回值类型一致

     //这里接受的参数,[29/April/2016:17:38:20 +0800]

     //返回的结果:2016-4-28 20:40:39

     public String evaluate(String s){

          SimpleDateFormat format=new SimpleDateFormat("dd/MMMMM/yyyy:HH:mm:ss Z",Locale.ENGLISH);

          if(s.indexOf("[")>-1){

              s=s.replace("[", "");

          }if(s.indexOf("]")>-1){

              s=s.replace("]", "");

          }

          try {

              //将输入的string转换成date数据类型

              Date date=format.parse(s);

              SimpleDateFormat rformat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

              return rformat.format(date);

          } catch (ParseException e) {

              // TODO Auto-generated catch block

              e.printStackTrace();

              return "";

          }

     }

}

步骤同案例一

案例三:把一个字段拆分成多个字段

package com.allen.hive;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class MyRequestParser extends GenericUDTF{

    

        @Override

          public StructObjectInspector initialize(ObjectInspector[] argIOs) throws UDFArgumentException {

              if(argIOs.length!=1){

              throw new UDFArgumentException("参数不正确");

          }

          ArrayList<String> filedNames=new ArrayList<String>();

          ArrayList<ObjectInspector> fieldOIs=new ArrayList<ObjectInspector>();

         

          filedNames.add("rool1");

          fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

          filedNames.add("rool2");

          fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

         

          filedNames.add("rool3");

          fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

          //将返回字段设置到该UDTF的返回值类型中

          return ObjectInspectorFactory.getStandardStructObjectInspector(filedNames, fieldOIs);

     }

     @Override

     public void close() throws HiveException {

         

     }

     //process方法是我们处理函数的输入并且输出结果的过程定义方法

     @Override

     public void process(Object[] args) throws HiveException {

        String input =args[0].toString();

        //去掉两头的“"”,\是转义字符。即两头的“"”,用空来代替“”

          input=input.replace("\"", "");

          String[] result=input.split(" ");

          //如果解析错误或失败,则返回三个字段的内容是“--”

          if(result.length!=3){

              result[0]="--";

              result[1]="--";

              result[2]="--";

          }

              forward(result);

     }

}

步骤同案例一

案例四:求和函数

package com.allen.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

import org.apache.hadoop.io.IntWritable;

public class MaxFlowUDAF extends UDAF {

     public static class MaxNumberUDAFEvaluator implements UDAFEvaluator{

          private IntWritable result;

          public void init(){

              result=null;

          }

          //聚合的多行中每行的被聚合的值都会被调用一次iterate方法,所以在这个方法里面我们来定义聚合规则

          public boolean iterate(IntWritable value){

              if(value==null){

                   return false;

              }if(result==null){

                   result=new IntWritable(value.get());

              }else{

                   //需求是求出流量最大值,在这里进行流量值的比较,将最大值放入result

                   result.set(Math.max(result.get(), value.get()));

              }

              return true;

          }

          //hive需要部分聚合结果时会调用该方法,返回当前的result作为hive取部分聚合值得结果

          public IntWritable terminatePartial(){

              return result;

          }

          //聚合值,新行未被处理的值会调用merge加入聚合,在这里直接调用上面定义的聚合规则方法iterate

          public boolean merge(IntWritable other){

              return iterate(other);

          }

          //hive需要最终聚合结果时调用的方法,返回最终结果

          public IntWritable terminate(){

              return result;

          }

     }

 

}

步骤同案例一

案例五:排序Topn

package com.allen.hive;

import java.util.ArrayList;

import java.util.Collections;

import java.util.Comparator;

import java.util.List;

import org.apache.hadoop.hive.ql.exec.UDAF;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class TopnUDAF extends UDAF{

    

     public static class State{

          ArrayList<Double> a;//保存topn的结果

          int n;//调用该函数的topn的n

     }

    

     public static class Evaluator implements UDAFEvaluator{

          private State state;

          public Evaluator() {

              init();

          }

          //初始化Evaluator对象

          public void init() {

              if(state==null){

                   state = new State();

              }

              state.a = new ArrayList<Double>();

              state.n = 0;

             

          }

          /**

           *map任务每行的值都会被调用一次iterate方法,iterate接收的参数正是调用函数时传入的参数

           * @param o 聚合的字段值

           * @param n   topn的n

           * @return

           */

          public boolean iterate(Double o,int n){

             

              //升降序topn表示,false表示最大值topn,true表示最小值topn

              boolean ascending = false;

              state.n = n;

              

              if(o!=null){

                   //是否插入标志

                   boolean doInsert = state.a.size()<n;

                   //如果当前的state.a的元素数量大于或者等于n则需要插入操作

                   if(!doInsert){

                        Double last = state.a.get(state.a.size()-1);

                        if(ascending){

                             doInsert = o<last;

                        }else{

                             doInsert = o>last;

                        }

                   }

                   if(doInsert){

                        //有顺序的插入o的值

                        binaryInsert(state.a,o,ascending);

                        if(state.a.size()>n){

                             state.a.remove(state.a.size()-1);

                        }

                   }

              }

              return true;

          }

          //将value的值按照ascending的顺序插入到List中相应的位置处

          static <T extends Comparable<T>> void binaryInsert(List<T> list,T value,boolean ascending){

              //根据顺序获取value在list中的位置

              int position = Collections.binarySearch(list, value,getComparator(ascending,(T)null));//!!!!

              if(position<0){

                   position = (-position) - 1;

              }

              list.add(position, value);

         

          }

          //比较器方法

          static <T extends Comparable<T>> Comparator<T> getComparator(boolean ascending,T dummy){

              Comparator<T> comp;

              if(ascending){

                   comp = new Comparator<T>(){

                        public int compare(T o1,T o2){

                             return o1.compareTo(o2);

                        }

                   };

              }else{

                   comp = new Comparator<T>(){

                        public int compare(T o1,T o2){

                             return o2.compareTo(o1);

                        }

                   };

              }

              return comp;

          }

          //一个map端执行结束后的输出值,这个值会被送到merge去合并

          public State terminatePartial(){

              if(state.a.size()>0){

                   return state;

              }else{

                   return null;

              }

          }

          /**

           * reduce端,将map端的输出结果,即terminatePartial的返回值,进行合并操作

           * 有多少个map端,reduce将会调用多少次merge方法

           * @param o 本次merge合并需要处理的map端terminatePartial方法返回的state对象

           * @return

           */

          public boolean merge(State o){

              //升降序topn表示,false表示最大值topn,true表示最小值topn

              boolean ascending = false;

              if(o!=null){

                   state.n = o.n;

                   state.a = sortedMerge(o.a,state.a,ascending,o.n);

                  

              }

              return true;

          }

         

          static <T extends Comparable<T>> ArrayList<T> sortedMerge(List<T> a1,List<T> a2,boolean ascending,int n){

              Comparator<T> comparator = getComparator(ascending,(T)null);

             

              int n1 = a1.size();

              int n2 = a2.size();

             

              int p1 = 0;//当前a1的元素

              int p2 = 0;//当前a2的元素

             

              //保存结果list,有n个元素

              ArrayList<T> output = new ArrayList<T>(n);

              //遍历并将a1和a2合并到output中,合并过程中保证output最多有n个元素

              while(output.size()<n && (p1<n1 || p2<n2)){

                   if(p1<n1){

                        if(p2==n2||comparator.compare(a1.get(p1), a2.get(p2))<0){

                             output.add(a1.get(p1++));

                        }

                   }

                   if(output.size()==n){

                        break;

                   }

                   if(p2<n2){

                        if(p1==n1||comparator.compare(a2.get(p2), a1.get(p1))<0){

                             output.add(a2.get(p2++));

                        }

                   }

              }

              return output;

          }

          public ArrayList<Double> terminate(){

              if(state.a.size()>0){

                           return state.a;

              }else{

                   return null;

              }

          }

     }   

 

}

步骤同案例一

附加:pom.xml配置

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>com.allen.hive</groupId>

    <artifactId>Hive_Test</artifactId>

    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>3.8.1</version>

            <scope>test</scope>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->

        <dependency>

            <groupId>org.apache.hive</groupId>

            <artifactId>hive-exec</artifactId>

            <version>2.1.0</version>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-contrib -->

        <dependency>

            <groupId>org.apache.hive</groupId>

            <artifactId>hive-contrib</artifactId>

            <version>2.1.0</version>

 

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->

        <dependency>

            <groupId>org.apache.hive</groupId>

            <artifactId>hive-jdbc</artifactId>

            <version>2.1.0</version>

 

        </dependency>

    </dependencies>

 

</project>

1.4 IDEA连接Hive,执行select简单测试

package com.allen.hive;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.Statement;

public class HiveTest {

     public static void main(String[] args) throws Exception {

          Class.forName("org.apache.hive.jdbc.HiveDriver");

          Connection conn=DriverManager.getConnection("jdbc:hive2://node4:10000","root","123qwe");

         

          try{

              Statement st=conn.createStatement();

              ResultSet ret=st.executeQuery("select count(*) from log_table");

              if(ret.next()){

                   System.out.println(ret.getInt(1));

              }

             

          }catch(Exception e){

              e.printStackTrace();

 

          }finally{

              conn.close();

 

          }

     }

}

因为使用的是hive2,所以要在CLI先使用命令hiveserver2启动10000端口,再执行程序,不然会报错:拒绝连接

结果如下:

Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive

与CLI执行结果一致:

Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive

执行程序时遇到的问题:

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

原因:log4j2的配置文件没有导入

解决办法:

尝试导入log4j.properties ,但并不行

需要导入log4j2.xml

在你项目的src下的resources下新建log4j2.xml,eclipse和IDEA会把其配置到WEB-INF的classes下

log4j2的配置

<?xml version="1.0" encoding="UTF-8"?>

<Configuration>

    <Appenders>

        <Console name="STDOUT" target="SYSTEM_OUT">

            <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>

        </Console>

        <RollingFile name="RollingFile" fileName="logs/strutslog1.log"

                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">

            <PatternLayout>

                <Pattern>%d{MM-dd-yyyy} %p %c{1.} [%t] -%M-%L- %m%n</Pattern>

            </PatternLayout>

            <Policies>

                <TimeBasedTriggeringPolicy />

                <SizeBasedTriggeringPolicy size="1 KB"/>

            </Policies>

            <DefaultRolloverStrategy fileIndex="max" max="2"/>

        </RollingFile>

    </Appenders>

    <Loggers>

        <Logger name="com.opensymphony.xwork2" level="WAN"/>

        <Logger name="org.apache.struts2" level="WAN"/>

        <Root level="warn">

            <AppenderRef ref="STDOUT"/>

        </Root>

    </Loggers>

 

</Configuration>