2015-08-25 67 views
3

通過IPython的筆記本上EMR(Hadoop的2.4.0)與紗線模式火花(1.4.0)運行PySpark使用:EMR PySpark:LZO編解碼器沒有找到

IPYTHON_OPTS="notebook --no-browser" nohup /usr/lib/spark/bin/pyspark --master yarn-client --num-executors 2 --executor-memory 512m --executor-cores 1 > /mnt/var/log/python_notebook.log 2> /mnt/var/log/python_notebook_err.log & 

掛了一個簡單的CSV文件HDFS ,並試圖使用

sc.textFile('/tmp/text.csv').first() 

但是讀它,這給了我Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found

在背景:

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-54-e39168c6841b> in <module>() 
----> 1 sc.textFile('/tmp/text.csv').first() 

/usr/lib/spark/python/pyspark/rdd.py in first(self) 
    1293   ValueError: RDD is empty 
    1294   """ 
-> 1295   rs = self.take(1) 
    1296   if rs: 
    1297    return rs[0] 

/usr/lib/spark/python/pyspark/rdd.py in take(self, num) 
    1245   """ 
    1246   items = [] 
-> 1247   totalParts = self.getNumPartitions() 
    1248   partsScanned = 0 
    1249 

/usr/lib/spark/python/pyspark/rdd.py in getNumPartitions(self) 
    353   2 
    354   """ 
--> 355   return self._jrdd.partitions().size() 
    356 
    357  def filter(self, f): 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o159.partitions. 
: java.lang.RuntimeException: Error in configuring object 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) 
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) 
    at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:65) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:47) 
    at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.reflect.InvocationTargetException 
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) 
    ... 25 more 
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. 
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) 
    at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175) 
    at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) 
    ... 29 more 
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found 
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) 
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128) 
    ... 31 more 

我試圖按照指示here,做:

os.environ['SPARK_LIBRARY_PATH'] = "/usr/lib/hadoop-lzo/lib/native/" 
os.environ['SPARK_CLASSPATH'] = "/usr/lib/hadoop-lzo/lib/" 

然而,這似乎並沒有幫助。

+0

您是否嘗試過[此鏈接](https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/reading-lzo-files.md)?看起來像使用'textFile'方法讀取LZO是個問題。 – santon

+0

@santon好吧,我正在嘗試讀取的文件不是LZO文件,它在本地HDFS中,所以我不確定它爲什麼被解釋爲LZO文件。我已經看到了這個鏈接,但似乎矯枉過正調用Hadoop API調用來讀取文本文件。 – tchakravarty

+0

明白了。對不起,我誤解了這個問題。也許檢查你的'core-site.xml'文件,看LZO是否被指定爲默認的壓縮編解碼器? – santon

回答

1

我知道這個問題很老,但我在過去一週處理這個問題,所以我想我會發布我們的解決方案,以防止其他人遇到這個問題。我們的設置是一個EC2實例作爲EMR之外的驅動程序運行,然後可以創建EMR集羣並與主站進行通信。集羣運行Spark 2.2.0,EMR版本爲5.9.0。

解決的方法是在Spark驅動程序上克隆Twitter Hadoop-Lzo Github repo,然後將路徑添加到hadoop-lzo.jar以激發提交參數。 SUBMIT_ARGS='--jars /opt/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar。只需將克隆到回購站點的路徑替換爲.jar即可。

相關問題