0
我使用以下python生產者發佈一些msg到我的kafka主題(我也可以在jupyter中使用python使用者完美地接收我發佈的數據)。Spark Streaming應用程序無法從Kafka接收msg
from kafka import KafkaProducer
import json,time
userdata={
"ipaddress": "172.16.0.57",
"logtype": "",
"mid": "",
"name":"TJ"
}
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
print("adding",i)
producer.send('test', userdata)
time.sleep(3)
但是當我嘗試運行火花kafkastreaming例子,我沒有得到任何東西(我要指出,火花在我的工作站工作,因爲我可以運行網絡流例如沒有任何問題):
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
import sys
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.10:2.0.2 pyspark-shell'
sc = SparkContext("local[2]", "KafkaSTREAMWordCount")
ssc = StreamingContext(sc, 2)
kafka_stream = KafkaUtils.createStream(ssc,"localhost:2181","raw-event-streaming-consumer",{"test":1})
parsed = kafka_stream.map(lambda (k, v): json.loads(v))
parsed.pprint()
ssc.start()
ssc.awaitTermination()
下面是輸出的示例:
-------------------------------------------
Time: 2017-08-28 14:08:32
-------------------------------------------
-------------------------------------------
Time: 2017-08-28 14:08:33
-------------------------------------------
-------------------------------------------
Time: 2017-08-28 14:08:34
-------------------------------------------
注:我的系統的規格爲如下:
的Ubuntu 16.04 火花:火花2.2.0彬hadoop2.7 Jupyter筆記本(Python 2.7版) 卡夫卡:kafka_2.11-0.11.0.0
我在我的.bashrc下面幾行:
export PATH="/home/myubuntu/anaconda3/bin:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/jars:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark:$PATH"
export PATH="/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/python/pyspark/streaming:$PATH"
function snotebook()
{
#Spark path (based on your computer)
SPARK_PATH=~/spark-2.0.0-bin-hadoop2.7
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
# For python 3 users, you have to add the line below or you will get an error
#export PYSPARK_PYTHON=python3
#$SPARK_PATH/bin/pyspark --master local[2]
/home/myubuntu/Desktop/spark-2.2.0-bin-hadoop2.7/bin/pyspark --master local[2]
}
你在哪裏運行此,它不工作? – bendl
在我的筆記本電腦(Ubuntu 16.04) – user2867237