0
我想要使用由卡夫卡發佈的火花數據,但我無法這樣做。我正在使用Spark 2.2。使用卡夫卡與火花使用pyspark問題
- 我想使用由卡夫卡使用Spark發送的數據,處理它並存儲在本地文件或HDFS中。
- 我想打印出運行spark工作後在控制檯中由kafka發送的數據(由spark消耗)。
對於卡夫卡,我下面這個教程:https://kafka.apache.org/quickstart
[[email protected] kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>message 1
>message 2
>message 3
>message 4
運行星火python腳本file.py:
./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py
Pyspark代碼:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stream").getOrCreate()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","localhost:9092")\
.option("subscribe","test")\
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")
#Trying to save result in a file
df.writeStream\
.format("text")\
.option("checkpointLocation", "file:///home/cloudera/file.txt")\
.option("path","file:///home/cloudera/file.txt")\
.start()
# Does not write to a file
#Trying to print result in console
df.writeStream()\
.outputMode("append")\
.format("console")\
.start()
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable
任何幫助?
只是爲了確保,你開始引發** ** THEN產生的數據權? – LuckyGuess
@Falan是的,我首先開始了kafka。我想知道如何從火花流中將數據存儲到HDFS中。 – Rio