2015-08-24 53 views
0

我有兩個星火1.4.1 PipelineRDD(我不知道什麼樣的對象是:-s:星火 - 在特定領域加入JSON RDDS(鍵 - 值)

1)名單IDS(ids_alsaciens RDD)

2)的列表personne(personnes RDD)

在 'Personnes' RDD有4個字段,在JSON格式,關鍵是 「ID」。 我可能在這張表中有同一人的幾條線(id是相同的)

我想獲取'personnes'RDD上的'alsacien'表中包含的所有行。

我怎麼能這樣做在火花?

>type(ids_alsaciens) 
pyspark.rdd.PipelinedRDD 
>type(personnes) 
pyspark.rdd.PipelinedRDD 

>ids_alsaciens.take(10) 
    [u'1933992', 
    u'2705919', 
    u'2914684', 
    u'2915444', 
    u'11602833', 
    u'11801394', 
    u'10707371', 
    u'2018422', 
    u'2312432', 
    u'233375'] 
    >personnes.take(3) 
    [{'date': '2013-06-03 00:00', 
     'field': 'WAID_INDIVIDU_WC_NUMNNI', 
     'id': '10000149', 
     'value': '2770278'}, 
    {'date': '2013-05-15 00:00', 
     'field': 'WAID_INDIVIDU_WC_NUMNNI', 
     'id': '10009910', 
     'value': '2570631'}, 
    {'date': '2013-03-01 00:00', 
     'field': 'WAID_INDIVIDU_WC_NUMNNI', 
     'id': '10014405', 
     'value': '1840288'}] 

編輯

嘗試: personnes.filter(拉姆達X:X在ids_alsaciens)

了異常: 例外:看來您正在嘗試播放的RDD或引用RDD從行動或轉變。 RDD轉換和操作只能由驅動程序調用,而不能在其他轉換中調用;例如,rdd1.map(lambda x:rdd2.values.count()* x)無效,因爲值轉換和計數操作不能在rdd1.map轉換中執行。有關更多信息,請參閱SPARK-5063。

回答

0

出現SPARK-5063錯誤是因爲不允許在地圖內調用RDD函數,因爲運行地圖任務的火花工作人員無法自行完成工作。

使用星火RDD.join:

documentation

join(otherDataset, [numTasks])  

當呼籲(K, V)類型和(K, W)的數據集,返回(K, (V, W))對所有元素對每個關鍵

數據集

祕訣就是知道Spark對待所有2作爲元組(key,value)對,您可以使用RDD.map(),使自己對:

kv_ids_alsaciens = ids_alsaciens.map(lambda id: (id, 0)) 

使得(k,v)雙從ids_alsaciens其中k=idv=0。這有點浪費,但我還沒有測試過是否可以消除v

與personnes

然後:

kv_personnes = personnes.map(lambda p: (p['id'],p)) 

現在我們可以使用加入成爲這樣

joined_kv_ids_alsaciens_personnes = kv_ids_alsaciens.join(kv_personnes) 

,同時將與條目進行RDD像

(10000149, (0, {'date': '2013-06-03 00:00', 'field': 'WAID_INDIVIDU_WC_NUMNNI', 'id': '10000149', 'value': '2770278'}))

,其中第一項是一個匹配的ID,並且第二個是 項目是一對(match1,match2)其中 match1總是0因爲設置我們的第一個數據始終 有0在對中的值,並且match2是一個字典的personnes數據 。

這不完全是需要的。更好的格式可能是隻發佈字典。我們可以用另一張地圖做到這一點。

match_personnes = joined_kv_ids_alsaciens_personnes.map(lambda (k,(v1,v2)): v2) 

總之,在內存緩存()的最終結果:

match_personnes = (ids_alsaciens 
        .map(lambda id: (id, 0)) 
        .join(personnes.map(lambda p: (p['id'],p))) 
        .map(lambda (k,(v1,v2)): v2) 
        .cache() 
        ) 

測試:

match_personnes.take(10) 
+0

我沒有測試你的代碼,當我達到一個相當類似的方案謝謝:-) –

+0

不客氣。 – Paul