0
我想將csv文件導入RDD格式。當我使用.first()
命令獲取rdd的第一行時,它會給出如下所述的錯誤。首先()不適用於現有的RDD
似乎.map
函數使得RDD進入流水線RDD,其中諸如.first()
和.count()
之類的命令不起作用。有沒有其他方法可以解決這個問題?
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["PassengerId","Survived","Pclass","Name","Sex","Age","SibSp","Parch","Ticket","Fare","Cabin","Embarked"])
return reader.next()
input = sc.textFile("C:\Users\rohit.guglani\Documents/train.csv",4).map(loadRecord)
type(input)
pyspark.rdd.PipelinedRDD
input.first()
給出了這樣的錯誤:
Py4JJavaError Traceback (most recent call last)
<ipython-input-9-d93d15081c08> in <module>()
----> 1 input.first()
C:\spark-1.6.1\python\pyspark\rdd.pyc in first(self)
1313 ValueError: RDD is empty
1314 """
-> 1315 rs = self.take(1)
1316 if rs:
1317 return rs[0]
C:\spark-1.6.1\python\pyspark\rdd.pyc in take(self, num)
1265 """
1266 items = []
-> 1267 totalParts = self.getNumPartitions()
1268 partsScanned = 0
1269
C:\spark-1.6.1\python\pyspark\rdd.pyc in getNumPartitions(self)
2361
2362 def getNumPartitions(self):
-> 2363 return self._prev_jrdd.partitions().size()
2364
2365 @property
C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
C:\spark-1.6.1\python\pyspark\sql\utils.pyc in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o50.partitions.
ohit.guglani/Documents/train.csv
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Unknown Source)
的代碼對我的作品要小心,但在Linux平臺上。檢查傳遞給textFile的路徑字符串,以便它不包含反斜槓字符或用'r'前綴字符串以防止Python將反斜槓字符解釋爲特殊字符。 –
感謝Philippe。反斜槓存在一個問題。我也認爲你不能在流水線RDD上運行.first(),但是.top(n)適用於流水線RDD。 –