2017-08-28 63 views
0

我使用以下python生產者發佈一些msg到我的kafka主題(我也可以在jupyter中使用python使用者完美地接收我發佈的數據)。Spark Streaming應用程序無法從Kafka接收msg

from kafka import KafkaProducer 
import json,time 
userdata={ 
     "ipaddress": "172.16.0.57", 
     "logtype": "", 
     "mid": "", 
     "name":"TJ" 
} 
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8')) 

for i in range(10): 
    print("adding",i) 
    producer.send('test', userdata) 
    time.sleep(3) 

但是當我嘗試運行火花kafkastreaming例子,我沒有得到任何東西(我要指出,火花在我的工作站工作,因爲我可以運行網絡流例如沒有任何問題):

from __future__ import print_function 
from pyspark.streaming.kafka import KafkaUtils 
import sys 
import os 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
import json 


os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.10:2.0.2 pyspark-shell' 

sc = SparkContext("local[2]", "KafkaSTREAMWordCount") 
ssc = StreamingContext(sc, 2) 
kafka_stream = KafkaUtils.createStream(ssc,"localhost:2181","raw-event-streaming-consumer",{"test":1}) 

parsed = kafka_stream.map(lambda (k, v): json.loads(v)) 
parsed.pprint() 
ssc.start() 
ssc.awaitTermination() 

下面是輸出的示例:

------------------------------------------- 
Time: 2017-08-28 14:08:32 
------------------------------------------- 

------------------------------------------- 
Time: 2017-08-28 14:08:33 
------------------------------------------- 

------------------------------------------- 
Time: 2017-08-28 14:08:34 
------------------------------------------- 

注:我的系統的規格爲如下:

的Ubuntu 16.04 火花:火花2.2.0彬hadoop2.7 Jupyter筆記本(Python 2.7版) 卡夫卡:kafka_2.11-0.11.0.0

我在我的.bashrc下面幾行:

export PATH="/home/myubuntu/anaconda3/bin:$PATH" 

export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin:$PATH" 

export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/jars:$PATH" 

export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python:$PATH" 

export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark:$PATH" 

export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark/streaming:$PATH" 


function snotebook() 
{ 
#Spark path (based on your computer) 
SPARK_PATH=~/spark-2.0.0-bin-hadoop2.7 

export PYSPARK_DRIVER_PYTHON="jupyter" 
export PYSPARK_DRIVER_PYTHON_OPTS="notebook" 

# For python 3 users, you have to add the line below or you will get an error 
#export PYSPARK_PYTHON=python3 

#$SPARK_PATH/bin/pyspark --master local[2] 
/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin/pyspark --master local[2] 
} 
+0

你在哪裏運行此,它不工作? – bendl

+0

在我的筆記本電腦(Ubuntu 16.04) – user2867237

回答

0

我發現了錯誤。隨着火花火花2.2.0彬hadoop2.7,我們需要使用下面的jar:

--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 
相關問題