我想從卡夫卡發送一個csv文件到火花流應用程序,我不知道該怎麼做。我在這裏閱讀了很多文章,但沒有人幫助我。從卡夫卡發送CSV到Spark Streaming
我希望我的卡夫卡製作者發送csv並稍後在應用程序(消費者)中進行拆分,但這並不重要。我試圖創建一個RDD併發送它。 這適用於正常的字符串消息,但它不適用於csv。
這是我的製片人:
message =sc.textFile("/home/guest/host/Seeds.csv")
producer.send('test', message)
我的火花消費者:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1])
counts = data.flatMap(lambda line: line.split(";")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
的問題是,通過發送CSV,火花streamming唐沒有收到任何事件。 有人可以幫助我的格式或概念?
我在一個docker容器下用python在筆記本上運行生產者和消費者。
謝謝。