2017-03-07 31 views
1

我已經2個RDDS它需要連接加入和映射2個RDDS有條件

val rdd1 = RDD[(v_id, inputObject1)] 

其中v_id是唯一的ID

和inputObject1具有以下字段

g_id, p_id, timestamp=t1 

現在我有另一RDD

val rdd2 = RDD[(g_id, inputObject2)] 

個這裏inputObject2有以下領域

p_id, timestamp=t2, e_id 

現在我想加入這2個RDDS下面條件

  • 如果G_ID和p_id的是相同的,| T1-T2 | < 30分鐘
  • 否則,如果g_id相同並且| t1-t2 | < 30分鐘

所以第二個條件是後備,如果第一個條件不滿足。我最後的輸出應該是這個

val resuldRDD = RDD[(v_id, inputObject11)] 

其中inputObject11 = inputObject1 +增加從第二RDD E_ID如果條件得到滿足。

所以字段將

g_id, p_id, e_id, timestamp=t1 
+0

也不蘇你可以加入條件。你可以通過ID加入,然後根據任何條件進行過濾。 – Hlib

回答