使用COGROUP
組織記錄使用相同的密鑰,但要避免JOIN
的不良跨產品。然後FILTER
用b
的記錄袋是否爲空,拆回兩個關係,並做UNION
:
a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
c = COGROUP a BY iid, b BY iid;
c_filt = FILTER c BY NOT IsEmpty(b);
a_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(a);
b_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(b);
out = UNION ONSCHEMA a_new, b_new;
singled = DISTINCT out;
STORE (ORDER singled BY iid) INTO '$output';
不過,我不是這個解決方案的粉絲 - 這是太多的線條和新關係如此簡單的操作。真正需要的是將兩個袋子合併成一個袋子的方式。豬顯然不提供這個(但如果是這樣,請回答this SO question)。你可以寫一個簡單的UDF要做到這一點,雖然:
public class MERGE extends EvalFunc<DataBag> {
public DataBag exec(Tuple input) throws IOException {
DataBag b = new DefaultDataBag();
try {
if (input != null)
for (int i = 0; i < input.size(); i++)
b.addAll((DataBag) input.get(i));
} catch (Exception e) { return null; }
return b;
}
}
有了這個UDF在手,溶液變成:
a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
c = FOREACH (COGROUP a BY iid, b BY iid) GENERATE group AS iid, MERGE(a,b) AS bag;
out = FOREACH c {
uniq = DISTINCT bag;
GENERATE iid, FLATTEN(bag);
};
STORE (ORDER out BY iid) INTO '$output';
這種方法的另一個好處是,如果你有幾個輸入端,你在COGROUP
之後不需要做幾個FOREACH
。只需添加更多的參數,以MERGE
:
c = FOREACH (COGROUP a BY iid, b BY iid, ..., z BY iid)
GENERATE group AS iid, MERGE(a,b,...,z) AS bag;
你需要從A和B的元組在不同的元組最終還是確定他們在同一個元組結束時IID匹配的價值? – Frederic
@Fred我剛剛在我的問題中添加了樣本數據A,B和輸出數據樣本以進行澄清:) – trillions