首页 > 学院 > 开发设计 > 正文

hive-udaf GroupConcat

2019-11-08 20:48:42
字体:
来源:转载
供稿:网友

package me.hadoop.hive.udaf;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;import org.apache.hadoop.hive.ql.util.javaDataModel;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;import org.apache.hadoop.hive.serde2.objectinspector.PRimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;import org.apache.hadoop.io.Text;@Description(name = "strings_concat", value = "_FUNC_(String, String) - 返回字符串按指定字符串拼接")  public class GroupConcat extends AbstractGenericUDAFResolver{		  	@Override  	public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  throws SemanticException {  		if (parameters.length != 2) {  			throw new UDFArgumentLengthException("Exactly two argument is expected.");  	    }  				if(parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE){			throw new UDFArgumentTypeException(0,					"only PRIMITIVE type is accpeted, but "					+ parameters[0].getTypeName() +"is passed. as first parameters");		}		if(parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE){			throw new UDFArgumentTypeException(0,					"only PRIMITIVE type is accepted, but"					+ parameters[1].getTypeName() +"is passed. as second parameters.");		}		       		PrimitiveCategory inputType1 = ((PrimitiveObjectInspector) parameters[0]).getPrimitiveCategory();				switch(inputType1){		case STRING:		case VARCHAR:		case CHAR:			break;		default:			throw new UDFArgumentException(					" GroupConcat only takes STRING/VARCHAR/CHAR as first parameters, got "					+ inputType1 );		}				PrimitiveCategory inputType2 = ((PrimitiveObjectInspector) parameters[1]).getPrimitiveCategory();				switch(inputType2){		case STRING:		case VARCHAR:		case CHAR:			break;		default:			throw new UDFArgumentException(					" GroupConcat only takes STRING/VARCHAR/CHAR as second parmameters, got"					+ inputType2 );		}          	    return new GroupConcatEvaluator();  	}  	  	public static class GroupConcatEvaluator extends GenericUDAFEvaluator {				private PrimitiveObjectInspector partialInputOI;		private Text result; 	  		@Override  		public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {  			assert (parameters.length == 2);  			super.init(m, parameters);  	        			//if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  			partialInputOI = (PrimitiveObjectInspector) parameters[0];  			//}						result = new Text();						return PrimitiveObjectInspectorFactory.writableStringObjectInspector;		}  	  		static class StringAgg extends AbstractAggregationBuffer {  			StringBuffer stringAgg;	    }  	  	    @Override  	    public AggregationBuffer getNewAggregationBuffer() throws HiveException {  	        StringAgg result = new StringAgg();	        reset(result);	        return result;  	    }  	  	    @Override  	    public void reset(AggregationBuffer agg) throws HiveException {  	    	((StringAgg) agg).stringAgg.append("");	    }	          	    @Override  	    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {  	        assert (parameters.length == 2);	        String source = "";	        if(parameters[0] != null){	        	source = ((PrimitiveObjectInspector) partialInputOI).getPrimitiveJavaObject(parameters[0]).toString();	        }	        	        String separator = "";	        if(parameters[1] != null){	        	separator = ((PrimitiveObjectInspector) partialInputOI).getPrimitiveJavaObject(parameters[1]).toString();	        }	        	        ((StringAgg) agg).stringAgg.append(source + separator);	    }	  	    @Override  	    public void merge(AggregationBuffer agg, Object partial) throws HiveException {  	        if (partial != null) {	        	String tstr = (String) partialInputOI.getPrimitiveJavaObject(partial); 	        	((StringAgg) agg).stringAgg.append(tstr);	        }	    }		@Override  		public Object terminatePartial(AggregationBuffer agg) throws HiveException {		    return terminate(agg);		}	        	    @Override  	    public Object terminate(AggregationBuffer agg) throws HiveException {  	    	result.set(((StringAgg) agg).stringAgg.toString() );	        return result;  	    }  	}	public static void main(String[] args) {		// TODO Auto-generated method stub	}}

Code Java:


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表