2017-01-09 34 views
0

閱讀序列文件我有它的價值看起來像在PySpark 2.0

(string_value, json_value) 

我不關心的字符串值序列文件。

在Scala中我可以讀取

val reader = sc.sequenceFile[String, String]("/path...") 
val data = reader.map{case (x, y) => (y.toString)} 
val jsondata = spark.read.json(data) 

我有一個很難轉換這PySpark文件。我曾嘗試使用

reader= sc.sequenceFile("/path","org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text") 
data = reader.map(lambda x,y: str(y)) 
jsondata = spark.read.json(data) 

錯誤是神祕的,但我可以提供他們,如果有幫助。我的問題是,在pySpark2中讀取這些序列文件的正確語法是什麼?

我想我沒有正確地將數組元素轉換爲字符串。我得到類似的錯誤,如果我不喜歡

m = sc.parallelize([(1, 2), (3, 4)]) 
m.map(lambda x,y: y.toString).collect() 

m = sc.parallelize([(1, 2), (3, 4)]) 
m.map(lambda x,y: str(y)).collect() 

感謝簡單的東西!

回答

2

您的代碼的基本問題是您使用的功能。傳遞給map的函數應該只有一個參數。二者必選其一:

reader.map(lambda x: x[1]) 

或者只是:

reader.values() 

只要keyClassvalueClass匹配數據,這應該是所有你需要在這裏和那裏應該是不需要額外的類型轉換(這是處理內部由sequenceFile)。寫斯卡拉:

Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 
     /_/ 

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111) 
Type in expressions to have them evaluated. 
Type :help for more information. 
scala> :paste 
// Entering paste mode (ctrl-D to finish) 

sc 
    .parallelize(Seq(
    ("foo", """{"foo": 1}"""), ("bar", """{"bar": 2}"""))) 
    .saveAsSequenceFile("example") 

// Exiting paste mode, now interpreting. 

閱讀在Python:

Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /__/.__/\_,_/_/ /_/\_\ version 2.1.0 
     /_/ 

Using Python version 3.5.1 (default, Dec 7 2015 11:16:01) 
SparkSession available as 'spark'. 
In [1]: Text = "org.apache.hadoop.io.Text" 

In [2]: (sc 
    ...:  .sequenceFile("example", Text, Text) 
    ...:  .values() 
    ...:  .first()) 
Out[2]: '{"bar": 2}' 

注意

舊版本的Python支持的元組參數拆包:

reader.map(lambda (_, v): v) 

不要使用它爲應該向前兼容的代碼。