2016-05-02 69 views
0

我想將csv文件導入RDD格式。當我使用.first()命令獲取rdd的第一行時,它會給出如下所述的錯誤。首先()不適用於現有的RDD

似乎.map函數使得RDD進入流水線RDD,其中諸如.first().count()之類的命令不起作用。有沒有其他方法可以解決這個問題?

import csv 
import StringIO 

def loadRecord(line): 
    input = StringIO.StringIO(line) 
    reader = csv.DictReader(input, fieldnames=["PassengerId","Survived","Pclass","Name","Sex","Age","SibSp","Parch","Ticket","Fare","Cabin","Embarked"]) 
    return reader.next() 
input = sc.textFile("C:\Users\rohit.guglani\Documents/train.csv",4).map(loadRecord) 

type(input) 

pyspark.rdd.PipelinedRDD 



input.first() 

給出了這樣的錯誤:

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-9-d93d15081c08> in <module>() 
----> 1 input.first() 

C:\spark-1.6.1\python\pyspark\rdd.pyc in first(self) 
    1313   ValueError: RDD is empty 
    1314   """ 
-> 1315   rs = self.take(1) 
    1316   if rs: 
    1317    return rs[0] 

C:\spark-1.6.1\python\pyspark\rdd.pyc in take(self, num) 
    1265   """ 
    1266   items = [] 
-> 1267   totalParts = self.getNumPartitions() 
    1268   partsScanned = 0 
    1269 

C:\spark-1.6.1\python\pyspark\rdd.pyc in getNumPartitions(self) 
    2361 
    2362  def getNumPartitions(self): 
-> 2363   return self._prev_jrdd.partitions().size() 
    2364 
    2365  @property 

C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

C:\spark-1.6.1\python\pyspark\sql\utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling o50.partitions. 
ohit.guglani/Documents/train.csv 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Unknown Source) 
+0

的代碼對我的作品要小心,但在Linux平臺上。檢查傳遞給textFile的路徑字符串,以便它不包含反斜槓字符或用'r'前綴字符串以防止Python將反斜槓字符解釋爲特殊字符。 –

+0

感謝Philippe。反斜槓存在一個問題。我也認爲你不能在流水線RDD上運行.first(),但是.top(n)適用於流水線RDD。 –

回答

0

在Windows上工作時,你應該同時使用/,而不是\

input = sc.textFile("C:\Users\rohit.guglani\Documents\train.csv",4).map(loadRecord)