以下是示例文件和相應的中間和最終結果 -
cat ids_test.json
{"A":"a1","B":"a2"}
cat part-test
{"content":"both_A_a1_B_a2","meta":{"A":"a1","B":"a2"}}
{"content":"only_B_a2","meta":{"A":"","B":"a2"}}
{"content":"only_A_a1","meta":{"A":"a1","B":""}}
{"content":"both_A_b1_B_b2","meta":{"A":"b1","B":"b2"}}
{"content":"only_A_c1","meta":{"A":"c1","B":""}}
cat /tmp/j1/part-m-00000
{"user_data::json":{"meta":"{B=a2, A=a1}","content":"both_A_a1_B_a2"},"ids::json":{"B":"a2","A":"a1"}}
{"user_data::json":{"meta":"{B=a2, A=}","content":"only_B_a2"},"ids::json":null}
{"user_data::json":{"meta":"{B=, A=a1}","content":"only_A_a1"},"ids::json":{"B":"a2","A":"a1"}}
{"user_data::json":{"meta":"{B=b2, A=b1}","content":"both_A_b1_B_b2"},"ids::json":null}
{"user_data::json":{"meta":"{B=, A=c1}","content":"only_A_c1"},"ids::json":null}
cat /tmp/j1_filter/part-m-00000
{"user_data::json":{"meta":"{B=a2, A=}","content":"only_B_a2"},"ids::json":null}
{"user_data::json":{"meta":"{B=b2, A=b1}","content":"both_A_b1_B_b2"},"ids::json":null}
{"user_data::json":{"meta":"{B=, A=c1}","content":"only_A_c1"},"ids::json":null}
cat /tmp/j2/part-m-00000
{"J1_FILTER::user_data::json":{"meta":"{B=a2, A=}","content":"only_B_a2"},"J1_FILTER::ids::json":null,"ids::json":{"B":"a2","A":"a1"}}
{"J1_FILTER::user_data::json":{"meta":"{B=b2, A=b1}","content":"both_A_b1_B_b2"},"J1_FILTER::ids::json":null,"ids::json":null}
{"J1_FILTER::user_data::json":{"meta":"{B=, A=c1}","content":"only_A_c1"},"J1_FILTER::ids::json":null,"ids::json":null}
cat /tmp/results/part-m-00000
{"J1_FILTER::user_data::json":{"meta":"{B=b2, A=b1}","content":"both_A_b1_B_b2"}}
{"J1_FILTER::user_data::json":{"meta":"{B=, A=c1}","content":"only_A_c1"}}
以下是腳本 -
user_data = LOAD 'part-test' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]);
ids = LOAD 'ids_test.json' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]);
J1 = JOIN user_data BY json#'meta'#'A' LEFT OUTER, ids BY json#'A' USING 'replicated';
rmf /tmp/j1
store J1 into '/tmp/j1' USING JsonStorage;
J1_FILTER = FILTER J1 BY ids::json is null;
rmf /tmp/j1_filter
store J1_FILTER into '/tmp/j1_filter' USING JsonStorage;
J2 = JOIN J1_FILTER BY user_data::json#'meta'#'B' left outer, ids BY json#'B' USING 'replicated';
rmf /tmp/j2
store J2 into '/tmp/j2' USING JsonStorage;
J2_FILTER = FILTER J2 BY ids::json is null;
RESULTS = FOREACH J2_FILTER GENERATE J1_FILTER::user_data::json;
--filtered_ids = FOREACH user_data_MINUS_ids GENERATE user_data AS data;
--DUMP filtered_ids;
rmf /tmp/results
store RESULTS into '/tmp/results' USING JsonStorage;
請看這裏http://datafu.incubator.apache.org/docs/datafu/guide/set-operations.html –