2014-03-19 54 views
1

我想使用spark shell從HDFS加入兩個文件。 這兩個文件是製表符分隔,我想加入的第二列在Spark中加入兩個HDFS文件

試過代碼 但不給任何輸出

val ny_daily= sc.parallelize(List("hdfs://localhost:8020/user/user/NYstock /NYSE_daily")) 

val ny_daily_split = ny_daily.map(line =>line.split('\t')) 

val enKeyValuePair = ny_daily_split.map(line => (line(0).substring(0, 5), line(3).toInt)) 


val ny_dividend= sc.parallelize(List("hdfs://localhost:8020/user/user/NYstock/NYSE_dividends")) 

val ny_dividend_split = ny_dividend.map(line =>line.split('\t')) 

val enKeyValuePair1 = ny_dividend_split.map(line => (line(0).substring(0, 4),  line(3).toInt)) 

enKeyValuePair1.join(enKeyValuePair) 

但我沒有得到有關如何加入對特定列 文件的任何信息請建議

回答

4

我沒有得到有關如何加入對特定列文件的任何信息

RDDS都加入了他們的鑰匙,讓你決定列加入對當你說:

val enKeyValuePair = ny_daily_split.map(line => (line(0).substring(0, 5), line(3).toInt)) 
... 
val enKeyValuePair1 = ny_daily_split.map(line => (line(0).substring(0, 4), line(3).toInt)) 

你RDDS會從line(0).substring(0, 5)line(0).substring(0, 4)裏的值進行連接。

您可以找到join函數(以及許多其他有用的函數)hereSpark Programming Guide是瞭解Spark如何工作的很好的參考。

試過代碼,但不給任何輸出

爲了看到輸出,你要問火花打印:

enKeyValuePair1.join(enKeyValuePair).foreach(println) 

注:從加載數據您應該使用的文件sc.textFile()sc.parallelize()僅用於使RDD遠離Scala集合。

下面的代碼應該做的工作:

val ny_daily_split = sc.textFile("hdfs://localhost:8020/user/user/NYstock/NYSE_daily").map(line =>line.split('\t')) 
val ny_dividend_split = sc.textFile("hdfs://localhost:8020/user/user/NYstock/NYSE_dividends").map(line =>line.split('\t')) 

val enKeyValuePair = ny_daily_split.map(line => line(0).substring(0, 5) -> line(3).toInt) 
val enKeyValuePair1 = ny_dividend_split.map(line => line(0).substring(0, 4) -> line(3).toInt) 

enKeyValuePair1.join(enKeyValuePair).foreach(println) 

順便問一下,你提到你想加入第二列,但你實際使用的是line(0),這是故意的嗎?

希望這會有所幫助!

+0

什麼我應該把JOIN的關鍵和價值,因爲我想加入列和作爲輸出我應該能夠看到整個加入數據集 –

+0

然後改變你的'地圖'功能''ny_daily_split.map(line =>線(1) - > line.mkString(「\ t」))''和'ny_dividend_split.map(line => line(1) - > line.mkString(「\ t」))''。 – fedragon

相關問題