错误猪UDF

问题描述:

后提领实际上涉及到的问题How can I add row numbers for rows in PIG or HIVE?错误猪UDF

作者Srini提供的3回答工作正常,但我有麻烦了UDF之后访问数据。

作者Srini提供的UDF是继

import java.io.IOException; 
import java.util.Iterator; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 
import org.apache.pig.data.DataType; 

public class RowCounter extends EvalFunc<DataBag> { 
TupleFactory mTupleFactory = TupleFactory.getInstance(); 
BagFactory mBagFactory = BagFactory.getInstance(); 
public DataBag exec(Tuple input) throws IOException { 
    try { 
     DataBag output = mBagFactory.newDefaultBag(); 
     DataBag bg = (DataBag)input.get(0); 
     Iterator it = bg.iterator(); 
     Integer count = new Integer(1); 
     while(it.hasNext()) 
      { Tuple t = (Tuple)it.next(); 
       t.append(count); 
       output.add(t); 
       count = count + 1; 
      } 

     return output; 
    } catch (ExecException ee) { 
     // error handling goes here 
     throw ee; 
    } 
} 
public Schema outputSchema(Schema input) { 
    try{ 
     Schema bagSchema = new Schema(); 
     bagSchema.add(new Schema.FieldSchema("RowCounter", DataType.BAG)); 

     return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), 
               bagSchema, DataType.BAG)); 
    }catch (Exception e){ 
     return null; 
    } 
    } 
} 

我写了一个简单的测试脚本猪如下

A = load 'input.txt' using PigStorage(' ') as (name:chararray, age:int); 
/* 
--A: {name: chararray,age: int} 
(amy,56) 
(bob,1) 
(bob,9) 
(amy,34) 
(bob,20) 
(amy,78) 
*/ 
B = group A by name; 
C = foreach B { 
    orderedGroup = order A by age; 
    generate myudfs.RowCounter(orderedGroup) as t; 
} 
/* 
--C: {t: {(RowCounter: {})}} 
({(amy,34,1),(amy,56,2),(amy,78,3)}) 
({(bob,1,1),(bob,9,2),(bob,20,3)}) 
*/ 
D = foreach C generate FLATTEN(t); 
/* 
D: {t::RowCounter: {}} 
(amy,34,1) 
(amy,56,2) 
(amy,78,3) 
(bob,1,1) 
(bob,9,2) 
(bob,20,3) 
*/ 

的问题是如何在以后的操作中使用d。我试过多种方法,但总是有以下错误

ava.lang.ClassCastException: java.lang.String cannot be cast to org.apache.pig.data.DataBag 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:575) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:248) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:316) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:332) 
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:284) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:459) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:427) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:407) 
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:261) 
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) 
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256) 

我的猜测是,因为我们没有对包内的元组的模式。如果这是原因,我应该如何修改udf?

+0

好的,我找到了解决方案 – user1591487 2012-08-14 01:18:59

确定,我通过添加outputSchema如下

public Schema outputSchema(Schema input) { 
    try{ 
     Schema.FieldSchema counter = new Schema.FieldSchema("counter", DataType.INTEGER); 
     Schema tupleSchema = new Schema(input.getField(0).schema.getField(0).schema.getFields()); 
     tupleSchema.add(counter); 

     Schema.FieldSchema tupleFs; 
     tupleFs = new Schema.FieldSchema("with_counter", tupleSchema, DataType.TUPLE); 

     Schema bagSchema = new Schema(tupleFs); 
     return new Schema(new Schema.FieldSchema("row_counter", 
               bagSchema, DataType.BAG)); 
    }catch (Exception e){ 
     return null; 
    } 
    } 
} 

由于找到了解决办法。