2017-05-01 76 views
0

我想從卡夫卡發送一個csv文件到火花流應用程序,我不知道該怎麼做。我在這裏閱讀了很多文章,但沒有人幫助我。從卡夫卡發送CSV到Spark Streaming

我希望我的卡夫卡製作者發送csv並稍後在應用程序(消費者)中進行拆分,但這並不重要。我試圖創建一個RDD併發送它。 這適用於正常的字符串消息,但它不適用於csv

這是我的製片人:

message =sc.textFile("/home/guest/host/Seeds.csv")  
producer.send('test', message) 

我的火花消費者:

ssc = StreamingContext(sc, 5) 

kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1]) counts = data.flatMap(lambda line: line.split(";")) \

.map(lambda word: (word, 1)) \ 
.reduceByKey(lambda a, b: a+b) 

的問題是,通過發送CSV,火花streamming唐沒有收到任何事件。 有人可以幫助我的格式或概念?

我在一個docker容器下用python在筆記本上運行生產者和消費者。

謝謝。

回答

相關問題