2017-08-21 66 views
0

我想要使用由卡夫卡發佈的火花數據,但我無法這樣做。我正在使用Spark 2.2。使用卡夫卡與火花使用pyspark問題

  1. 我想使用由卡夫卡使用Spark發送的數據,處理它並存儲在本地文件或HDFS中。
  2. 我想打印出運行spark工作後在控制檯中由kafka發送的數據(由spark消耗)。

對於卡夫卡,我下面這個教程:https://kafka.apache.org/quickstart

[[email protected] kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    >message 1 
    >message 2 
    >message 3 
    >message 4 

運行星火python腳本file.py:

./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py 

Pyspark代碼:

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("stream").getOrCreate() 

df = spark\ 
.readStream\ 
.format("kafka")\ 
.option("kafka.bootstrap.servers","localhost:9092")\ 
.option("subscribe","test")\ 
.load() 

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic") 


#Trying to save result in a file 
df.writeStream\ 
.format("text")\ 
.option("checkpointLocation", "file:///home/cloudera/file.txt")\ 
.option("path","file:///home/cloudera/file.txt")\ 
.start() 
# Does not write to a file 

#Trying to print result in console 
df.writeStream()\ 
.outputMode("append")\ 
.format("console")\ 
.start() 
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable 

任何幫助?

+0

只是爲了確保,你開始引發** ** THEN產生的數據權? – LuckyGuess

+0

@Falan是的,我首先開始了kafka。我想知道如何從火花流中將數據存儲到HDFS中。 – Rio

回答

0

的問題很可能是這一行:

df.writeStream()\ 

從行中刪除()像這樣:

df.writeStream\ 
+1

你好@antoine,歡迎來到StackOverflow!請花一分鐘時間[參觀](https://stackoverflow.com/tour)。當他們解釋OP所做的錯誤以及爲什麼時,答案更有用。請編輯你的答案來描述你的代碼爲什麼會起作用,而不是OP的作用。 –