2016-12-06 75 views
0

您好我正在使用scala來識別行的第一個字並創建一個唯一值並將其附加到RDD中。但我不知道該怎麼做。我是斯卡拉新手,所以請原諒,如果這個問題聽起來跛腳。 我正在嘗試的示例如下。如何在RDD中添加唯一值火花

樣品:

OBR|1|METABOLIC PANEL 
OBX|1|Glucose 
OBX|2|BUN 
OBX|3|CREATININE 
OBR|2|RFLX TO VERIFICATION 
OBX|1|EGFR 
OBX|2|SODIUM 
OBR|3|AMBIGUOUS DEFAULT 
OBX|1|POTASSIUM 

我要檢查,如果第一個字是OBR與否,如果它是OBR比我創建了一個獨特的價值,並希望將其追加在OBR和下方OBX直到我發現了一個OBR,我想這樣做。但是我怎麼能做到這一點?我正在將我的數據從HDFS

預期結果:

OBR|1|METABOLIC PANEL|OBR_filename_1 
OBX|1|Glucose|OBR_filename_1 
OBX|2|BUN|OBR_filename_1 
OBX|3|CREATININE|OBR_filename_1 
OBR|2|RFLX TO VERIFICATION|OBR_filename_2 
OBX|1|EGFR|OBR_filename_2 
OBX|2|SODIUM|OBR_filename_2 
OBR|3|AMBIGUOUS DEFAULT|OBR_filename_3 
OBX|1|POTASSIUM|OBR_filename_3 
+0

在分佈式系統(如spark和hdfs)中,沒有像按順序讀取文件那樣的東西。如果所有的OBR都是先讀取的,然後再讀取所有的OBR,你會如何處理代碼?你想讓所有其他記錄得到最後一個文件名嗎?如果不是,如果你在單個文件上使用單個核心運行你的應用程序,那麼你可能會按照你期望的順序讀入文件,但是爲什麼使用spark呢? –

+0

@ASpotySpot我想依次讀取它,並檢查它是否獲取** OBR **作爲第一個值創建「OBR_filename_id」,並在所有obx中放入相同的「OBR_filename_id」,直到它到達下一個** OBR ** – animal

+0

因此,您的文件由於它在hdfs上被分成許多部分。例如,順序閱讀意味着什麼?如果它沒有被分割成許多部分,那麼不管怎麼做,它都會平行地處理文件的部分內容,除非你使用單個內核,否則讓它依次操作依然是棘手的(據我所知)。我可以把一些東西放在一起,但在這一點上,我相信它使用火花毫無意義。我的HDFS中的 –

回答

1

好了,所以在我的評論中提到,這將只在單一內核上,而不應使用火花這樣做,除非有人能上的東西我一些啓發失蹤。 我假設該文件只是您的示例中所述的hdfs上的文本文件。

val text: RDD[(String, Long)] = sc.textFile(<path>).zipWithIndex 
val tupled: RDD[((String, Int, String), Int)] = text.map{case (r, i) => (r.split('|'), i)).map{case (s, i) => ((s(0), s(1).toInt, s(2)), i)} 
val obrToFirstIndex: Array[(Int, Long)] = tupled.filter(_._1._1 == "OBR").map{case (t, i) => (t._2, i)}.reduceByKey(Math.min).collect() 
val bcIndexes = sc.broadcast(obrToFirstIndex.sortBy(_._2)) 
val withObr = tupled.mapValues(i => bcIndexes.value.find(_._2 >= i).getOrElse(bcIndexes.value.last)._1) 
val result: RDD[String] = withObr.map{case ((t1, t2, t2), obrind) => Array(t1, t2, t3, s"OBR_filaneme_$obrind").mkString("|") 

在我的當前ennvironement我無法測試上面的,因此可能會受到差一錯誤或錯別字輕微但這個想法是存在的。但讓我重申,這不是一個火花的工作。

編輯:剛剛發生在我身上,因爲只有一部分可以使用mapPartitions,只是寫代碼將如何在該分區內的Java/Scala。

您遇到的問題是查找不正確,它需要不同的條件才能工作。這裏是我之前用mapPartitions暗示的更簡單的方法

val text: RDD[String] = sc.textFile(<path>) 
val result: RDD[String] = text.mapPartitions{part => 
    var obrInd = 0 
    part.map{r => 
     val code= r.split('|')(0) 
     if(code == "OBR") obrInd += 1 
     r + "|OBR_filename_" + obrInd 
    } 
} 
+0

你能告訴我爲什麼'reduceByKey(Math.min)'被使用? – animal

+0

在不使用整個分區的火花操作中,沒有任何行具有任何其他行的概念。例如)地圖不能根據其他行的內容來改變其輸出。我們需要以某種方式組合行來解決您的問題。我所做的就是根據OBR ID將所有行組合在一起。然後我採取最小的索引(這裏索引是行號)以獲得第一次發生。例如)在你的樣本中你會得到:1 - > 0,2 - > 4,3 - > 7.然後我們用它來決定哪個索引應該到哪個OBR ID。現在發生對我來說,可能並不需要,如果每個OBR行都有一個唯一的ID –

+0

我想你的方式,但我得到這個結果 'OBR | 1 |代謝小組| OBR_filaneme_1 OBX | 1 |葡萄糖| OBR_filaneme_2 OBX | 2 | BUN | OBR_filaneme_2 OBX | 3 |肌酐| OBR_filaneme_2 OBR | 2 | RFLX覈查| OBR_filaneme_2 OBX | 1 | EGFR | OBR_filaneme_3 OBX | 2 | SODIUM | OBR_filaneme_3 OBR | 3 |歧義DEFAULT | OBR_filaneme_3 OBX | 1 | POTASSIUM | OBR_filaneme_3' – animal