2016-01-11 70 views
0

我有這樣的數據,濾波RDD並且提取匹配的火花蟒數據

cl_id  cn_id  cn_value 
10004,  77173296  ,390.0 
10004,  77173299  ,376.0 
10004,  77173300  ,0.0 
20005,  77173296  ,0.0 
20005,  77173299  ,6.0 
2005,  77438800  ,2.0 

Cl_id的ID:10004,20005

過濾器的10004

10004,  77173296  ,390.0 
10004,  77173299  ,376.0 

過濾器的20005

20005, 77173296 ,0.0 
20005, 77173299  ,6.0 

Now我想返回RDD一樣,

10004,cn_id,x1(77173296.value,77173300.value) ==> 10004,77173296,390.0,376.0 
20005,cn_id,x1(77173296.value,77173300.value) ==> 20005,77173296,0.0,6.0 

而且我想在此return_RDD執行一些操作:

def cal_for(rdd_list): 
    #list.map(position1).filter(cn_id for this formula)-> calculate that formula -> store in a separate RDD -> Return that RDD 

    rdd_list = rdd_list.map(lambda line:line.split(',')) 
    new_list = rdd_list.map(lambda x: (x[0]+', '+x[1],float(x[2]))) 
    new_list = rdd_list.filter(lambda x: x[1] == '77173296' && x[1] == '77173299') 
    ## then get the RDD containing respective cn_values for cn_id 77173296 & cn_id 77173299 
    ## and apply the following formula whre a=77173296.value b=77173299.value for cl_id 1004 

    try: 
     # want to process RDD with this Formula 
     return ((float(a)/float(a+b))*100) 
    except ZeroDivisionError: 
     return 0 

#return or save cal_RDD 

回答

1

而是通過ID過濾RDD兩次,修改和重組所產生的RDDS,只是組,然後映射這些值以進行所需的任何更改。如果您想根據某些標準進一步限制結果,請在映射時執行過濾器。

我真的不能給你一個更精確的答案爲:

一)它看起來並不像你真的想還沒有實現這一點,並 B)我不能完全肯定什麼你要。

+0

由於我是新來的這將面臨我的問題。並從輸入Rdd我只是想過濾基於cl_id的結果,並將proccesed Rdd傳遞給cal_fo() –

+0

您可能想了解更多關於Scala和Spark的內容。例如,您不需要返回關鍵字,並且在您擁有RDD後操作數據相對簡單。我不完全確定你的問題是Spark還是希望有人爲你編寫邏輯。我想你應該描述你希望達到的目標(沒有提到實際值)。 –

+0

真的非常感謝Steven。我會更加努力並試圖簡化和實施它。非常感謝。 –