我正在使用最新的spark(2.1.0)和python(3.5.3)。我在本地安裝了kafka(2.10.0)。運行python程序與kafka進行火花流傳輸時出錯
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pykafka import KafkaClient
import json
import sys
import pprint
spsc = SparkContext(appName="SampleApp")
stsc = StreamingContext(spsc, 1)
print('contexts =================== {} {}'.format(spsc,stsc));
kvs = KafkaUtils.createStream(stsc, "localhost:2181", "spark-consumer", {"7T-test3": 1})
spsc.stop()
這裏'打印'行執行罰款。但在下一行,同時創造流我得到以下錯誤,
Traceback (most recent call last):
File "/Users/MacAdmin/Downloads/spark-streaming/spark/spark_streaming_osample.py", line 24, in <module>
kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {"7T-test3": 1})
File "/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 70, in createStream
File "/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.createStream.
: java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:91)
at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:168)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createStream(KafkaUtils.scala:632)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 25 more
我從命令行運行我的程序作爲
/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.3.jar spark_streaming_sample.py
我需要任何環境變量或者我沒有使用正確的庫版本?