2011-06-22 40 views
32

有沒有辦法將重複集合保存到Hive中的收集集合中,或者模擬Hive使用其他方法提供的聚合集合類型?我想將具有相同密鑰的列中的所有項目彙總到一個數組中,並帶有重複項。COLLECT_SET()在Hive中,保持重複?

即:

hash_id | num_of_cats 
===================== 
ad3jkfk   4 
ad3jkfk   4 
ad3jkfk   2 
fkjh43f   1 
fkjh43f   8 
fkjh43f   8 
rjkhd93   7 
rjkhd93   4 
rjkhd93   7 

應該返回:

hash_agg | cats_aggregate 
=========================== 
ad3jkfk Array<int>(4,4,2) 
fkjh43f Array<int>(1,8,8) 
rjkhd93 Array<int>(7,4,7) 
+0

**如果這不清楚**:請讓我知道。我仍然試圖解決這個問題:( – batman

回答

23

嘗試在Hive 0.13之後使用COLLECT_LIST(col)。0

SELECT 
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set 
FROM 
    tablename 
WHERE 
    blablabla 
GROUP BY 
    hash_id 
; 
+2

GROUP BY hash_id is missing – Tagar

22

沒有什麼內置的是,但創建用戶定義的函數,包括碎石,是沒有那麼糟糕。唯一粗糙的部分是試圖讓它們類型泛型,但這裏是一個收集示例。

package com.example; 

import java.util.ArrayList; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; 
import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.parse.SemanticException; 
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; 
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; 
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; 

public class CollectAll extends AbstractGenericUDAFResolver 
{ 
    @Override 
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis) 
      throws SemanticException 
    { 
     if (tis.length != 1) 
     { 
      throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected."); 
     } 
     if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE) 
     { 
      throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1."); 
     } 
     return new CollectAllEvaluator(); 
    } 

    public static class CollectAllEvaluator extends GenericUDAFEvaluator 
    { 
     private PrimitiveObjectInspector inputOI; 
     private StandardListObjectInspector loi; 
     private StandardListObjectInspector internalMergeOI; 

     @Override 
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) 
       throws HiveException 
     { 
      super.init(m, parameters); 
      if (m == Mode.PARTIAL1) 
      { 
       inputOI = (PrimitiveObjectInspector) parameters[0]; 
       return ObjectInspectorFactory 
         .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils 
         .getStandardObjectInspector(inputOI)); 
      } 
      else 
      { 
       if (!(parameters[0] instanceof StandardListObjectInspector)) 
       { 
        inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils 
          .getStandardObjectInspector(parameters[0]); 
        return (StandardListObjectInspector) ObjectInspectorFactory 
          .getStandardListObjectInspector(inputOI); 
       } 
       else 
       { 
        internalMergeOI = (StandardListObjectInspector) parameters[0]; 
        inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector(); 
        loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); 
        return loi; 
       } 
      } 
     } 

     static class ArrayAggregationBuffer implements AggregationBuffer 
     { 
      ArrayList<Object> container; 
     } 

     @Override 
     public void reset(AggregationBuffer ab) 
       throws HiveException 
     { 
      ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>(); 
     } 

     @Override 
     public AggregationBuffer getNewAggregationBuffer() 
       throws HiveException 
     { 
      ArrayAggregationBuffer ret = new ArrayAggregationBuffer(); 
      reset(ret); 
      return ret; 
     } 

     @Override 
     public void iterate(AggregationBuffer ab, Object[] parameters) 
       throws HiveException 
     { 
      assert (parameters.length == 1); 
      Object p = parameters[0]; 
      if (p != null) 
      { 
       ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminatePartial(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 

     @Override 
     public void merge(AggregationBuffer ab, Object o) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o); 
      for(Object i : partial) 
      { 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminate(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 
    } 
} 

然後在蜂巢,只是發出add jar Whatever.jar;CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll'; 您應該將能夠使用它作爲預期。

hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id; 
OK 
ad3jkfk [4,4,2] 
fkjh43f [1,8,8] 
rjkhd93 [7,4,7] 

值得一提的是,元素的順序,應考慮不確定的,所以如果你打算使用這種飼料信息到n_grams您可能需要展開有點作爲所需的數據進行排序。

+0

不錯的答案:)我結束了嘗試,並有幾個問題。仔細查看你的代碼,我發現我做錯了(type-generic * is * hard),我認爲這會奏效。 – batman

12

修改了Jeff Mc的代碼以刪除限制(大概繼承自collect_set),輸入必須是原始類型。該版本可以收集結構,地圖和數組以及基元。

package com.example; 

import java.util.ArrayList; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; 
import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.parse.SemanticException; 
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; 
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; 
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; 

public class CollectAll extends AbstractGenericUDAFResolver 
{ 
    @Override 
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis) 
      throws SemanticException 
    { 
     if (tis.length != 1) 
     { 
      throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected."); 
     } 
     return new CollectAllEvaluator(); 
    } 

    public static class CollectAllEvaluator extends GenericUDAFEvaluator 
    { 
     private ObjectInspector inputOI; 
     private StandardListObjectInspector loi; 
     private StandardListObjectInspector internalMergeOI; 

     @Override 
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) 
       throws HiveException 
     { 
      super.init(m, parameters); 
      if (m == Mode.PARTIAL1) 
      { 
       inputOI = parameters[0]; 
       return ObjectInspectorFactory 
         .getStandardListObjectInspector(ObjectInspectorUtils 
         .getStandardObjectInspector(inputOI)); 
      } 
      else 
      { 
       if (!(parameters[0] instanceof StandardListObjectInspector)) 
       { 
        inputOI = ObjectInspectorUtils 
          .getStandardObjectInspector(parameters[0]); 
        return (StandardListObjectInspector) ObjectInspectorFactory 
          .getStandardListObjectInspector(inputOI); 
       } 
       else 
       { 
        internalMergeOI = (StandardListObjectInspector) parameters[0]; 
        inputOI = internalMergeOI.getListElementObjectInspector(); 
        loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); 
        return loi; 
       } 
      } 
     } 

     static class ArrayAggregationBuffer implements AggregationBuffer 
     { 
      ArrayList<Object> container; 
     } 

     @Override 
     public void reset(AggregationBuffer ab) 
       throws HiveException 
     { 
      ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>(); 
     } 

     @Override 
     public AggregationBuffer getNewAggregationBuffer() 
       throws HiveException 
     { 
      ArrayAggregationBuffer ret = new ArrayAggregationBuffer(); 
      reset(ret); 
      return ret; 
     } 

     @Override 
     public void iterate(AggregationBuffer ab, Object[] parameters) 
       throws HiveException 
     { 
      assert (parameters.length == 1); 
      Object p = parameters[0]; 
      if (p != null) 
      { 
       ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminatePartial(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 

     @Override 
     public void merge(AggregationBuffer ab, Object o) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o); 
      for(Object i : partial) 
      { 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminate(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 
    } 
} 
+0

這可能是一個版本問題,但我只是嘗試安裝到我們的repo中並編譯,但是當它在配置單元中被調用時,會出現以下錯誤:'此任務的診斷消息: 由java.lang.reflect引起。的InvocationTargetException \t在sun.reflect.NativeMethodAccessorImpl.invoke0(本機方法) \t在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) \t在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc ...' – jlemaitre

11

由於蜂巢0.13,有一個內置的UDAF稱爲collect_list()是實現這一目的。見here

+1

奇怪的是,'collect_list'不能收集非原始類型(在Hive 0.13.1中)。否則,使用像這樣的內置函數將會非常棒。 –

+3

klout團隊擁有UDF的GREAT回購,您可以b盧維思。其中有一個處理非基元的收集函數。 https://github.com/klout/brickhouse/tree/master/src/main/java/brickhouse/udf/collect – jlemaitre

+0

@jlemaitre,謝謝你的鏈接! 「其中是一個處理非基元的收集函數」 這是哪一個?提前致謝。 – Tagar

1

這裏是確切的蜂巢查詢做這件工作(僅適用於蜂巢> 0.13):

SELECT hash_id,collect_set(num_of_cats)FROM GROUP BY hash_id;

1

爲了什麼是值得的(雖然我知道這是一個較舊的帖子),Hive 0.13.0功能新的collect_list()功能,不重複刪除。

+0

你能解釋一下這個函數嗎?通常這種長度的東西可以更好地用作對答案的評論(不幸的是,你不能這樣做,因爲你沒有足夠的評論)。 –

0

解決方法收集結構

假設你有一個表

tableWithStruct(
id string, 
obj struct <a:string,b:string>) 

現在創建另一個表作爲

CREATE EXTERNAL TABLE tablename (
id string, 
temp array<string> 
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|' 

插入查詢

insert into table tablename select id,collect(concat_ws('|',cast(obj.a as string),cast(obj.b as string)) from tableWithStruct group by id; 

現在建立在同一地點的另一個表作爲表名

CREATE EXTERNAL TABLE tablename_final (
id string, 
array_list array<struct<a:string,b:string>> 
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|' 

當你從tablename_final選擇您將獲得所需的輸出

-1

只是想知道 - 如果N的statemnent -

SELECT 
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set 
FROM 
    tablename 
WHERE 
    blablabla 
GROUP BY 
    hash_id 
; 

我們想要排序並限制num_of_cats的元素 - 如何去abou它呢?大數據中的COZ我們處理數據的PB。在這種情況下,我們可能不需要所有這些,但是前10名或限制它。

+0

如果您有新問題,請點擊[Ask Question](問問題)(http://stackoverflow.com/questions/ask)按鈕。如果有助於提供上下文,請包含此問題的鏈接。 - [發表評論](/ review/low-quality-posts/10597681) –

+1

好的,先生 - 只是我沒有要點添加評論 - 下次嘗試保持系統性。 –

+0

謝謝,當你有名譽時,你可以發表評論。 –