2017-08-14 53 views
0

我有輸入像以下示例火花寫入結果[數組[不限]]到文件

3070811,1963,1096,,"US","CA",,1, 
3022811,1963,1096,,"US","CA",,1,56 
3033811,1963,1096,,"US","CA",,1,23 

寫入用0替換空字符後,我試圖將結果寫入文本文件,我越來越

scala> result.saveAsTextFile("data/result") 
<console>:34: error: value saveAsTextFile is not a member of Array[Array[Any]] 
       result.saveAxtFile("data/result") 

下面是解

scala> val file2 = sc.textFile("data/file.txt") 
scala> val mapper = file2.map(x => x.split(",",-1)) 
scala> val result = mapper.map(x => x.map(x => if(x.isEmpty) 0 else x)).collect() 
result: Array[Array[Any]] = Array(Array(3070811, 1963, 1096, 0, "US", "CA", 0, 1, 0), Array(3022811, 1963, 1096, 0, "US", "CA", 0, 1, 56), Array(3033811, 1963, 1096, 0, "US", "CA", 0, 1, 23)) 
scala> result.saveAsTextFile("data/result") 
<console>:34: error: value saveAsTextFile is not a member of Array[Array[Any]] 
       result.saveAsTextFile("data/result") 

我也曾嘗試以下,併爲失敗以及

scala> val output = result.map(x => (x(0),x(1),x(2),x(3), x(4), x(5), x(7), x(8))) 
output: Array[(Any, Any, Any, Any, Any, Any, Any, Any)] = Array((3070811,1963,1096,0,"US","CA",1,0), (3022811,1963,1096,0,"US","CA",1,56), (3033811,1963,1096,0,"US","CA",1,23)) 

scala> output.saveAsTextFile("data/output") 
<console>:36: error: value saveAsTextFile is not a member of Array[(Any, Any, Any, Any, Any, Any, Any, Any)] 
       output.saveAsTextFile("data/output") 

,然後添加以下和失敗,以及

scala> output.mapValues(_.toList).saveAsTextFile("data/output") 
<console>:36: error: value mapValues is not a member of Array[(Any, Any, Any, Any, Any, Any, Any, Any)] 
       output.mapValues(_.toList).saveAsTextFile("data/output") 

我怎麼能在控制檯或在結果文件的結果或輸出變量的內容查看。這裏缺少一些基本的東西。

更新1

每香卡拉臘我已刪除.collect然後保存執行。

scala> val result = mapper.map(x => x.map(x => if(x.isEmpty) 0 else x)) 

,這是導致該輸出

[Ljava.lang.Object;@7a1167b6 
[Ljava.lang.Object;@60d86d2f 
[Ljava.lang.Object;@20e85a55 

更新1.A

拿起更新的答案,這是給正確的數據

scala> val result = mapper.map(x => x.map(x => if(x.isEmpty) 0 else x).mkString(",")) 
result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[29] at map at <console>:31 

scala> result.saveAsTextFile("data/mkstring") 

結果

3070811,1963,1096,0,"US","CA",0,1,0 
3022811,1963,1096,0,"US","CA",0,1,56 
3033811,1963,1096,0,"US","CA",0,1,23 

更新2

scala> val output = result.map(x => (x(0),x(1),x(2),x(3), x(4), x(5), x(7), x(8))) 
output: org.apache.spark.rdd.RDD[(Any, Any, Any, Any, Any, Any, Any, Any)] = MapPartitionsRDD[27] at map at <console>:33 

scala> output.saveAsTextFile("data/newOutPut") 

和我得到這個結果

(3070811,1963,1096,0,"US","CA",1,0) 
(3022811,1963,1096,0,"US","CA",1,56) 
(3033811,1963,1096,0,"US","CA",1,23) 

回答

2

以下代碼返回Array[Array[Any]]

val result = mapper.map(x => x.map(x => if(x.isEmpty) 0 else x)).collect() 

由於沒有方法saveAsTextFileArray

正是在RDD可用,因此您不需要收集輸出

val result = mapper.map(x => x.map(x => if(x.isEmpty) 0 else x)) 

使用mkstring()轉換爲字符串,並寫入文件

val result = mapper.map(x => x.map(x => if(x.isEmpty) 0 else x).mkString(",")) 

你也應該停止使用collect(),它將所有的數據帶到驅動程序,如果數據很大,可能會導致內存問題。

希望這會有所幫助!

+0

請參閱問題中的更新1和更新2。 –

+0

如果您有csv文件,您可以使用https://github.com/databricks/spark-csv spark csv來讀取和寫入文件,該文件更加簡單高效。 –

+0

謝謝,在問題中添加了第1.a節。 mkString像更新2一樣工作 –