2015-11-08 99 views
0

這是一個後續問題Spark with Flume (configuration/classpath?)星火流(運行例如VS火花提交)

了嘗試一些東西后存儲的問題,這個問題我現在是

$火花提交 - -jars /opt/scala/spark-streaming-flume_2.10-1.5.1.jar --master本地[*] /home/user/spark/FlumeStreaming.py

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.flume import FlumeUtils 

sc = SparkContext(appName="Newapp") 
strm = StreamingContext(sc,1) 

flume = FlumeUtils.createStream(strm,"localhost",9999) 

lines = flume.map(lambda x: x[1]) 
counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
counts.pprint() 

strm.start() 
strm.awaitTermination() 

15/11/07 23:55:09 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoClassDefFoundError: org/apache/flume/source/avro/AvroSourceProtocol 
     at org.apache.spark.streaming.flume.FlumeReceiver.responder$lzycompute(FlumeInputDStream.scala:147) 
     at org.apache.spark.streaming.flume.FlumeReceiver.responder(FlumeInputDStream.scala:146) 
     at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:163) 
     at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:170) 
     at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
     at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
     at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) 
     at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) 
     at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) 
     at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:88) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:744) 
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.avro.AvroSourceProtocol 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
     ... 16 more 

15/11/07 23:55:09 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
15/11/07 23:55:09 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 

我的問題是,它與spark例子中提供的flume_wordcount.py的代碼相同,該例中的例子工作正常,但我的版本不起作用。不同之處在於它運行一個運行示例,另一個運行spark-submit,指向classpath和jar文件的管理方式。有什麼我該做的?

回答

0

排序了這一點,必須有正確的罐子,並把它傳遞給火花提交

$spark-submit --jars /path/to/spark-streaming-flume-assembly*.jar FlumeStreaming.py localhost 12345