我寫一個星火結構流應用Pyspark不允許從卡夫卡讀取數據。星火流:卡夫卡組ID星火結構化流
但是,Spark的當前版本是2.1.0,它不允許我將group id設置爲參數,並會爲每個查詢生成唯一的id。但卡夫卡連接是基於組的授權,需要預設的組標識。
因此,是否有任何解決方法來建立連接而不需要更新Spark到2.2,因爲我的團隊不需要它。
我的代碼:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
我不認爲你可以在Spark 2.2中設置'group.id' - http://spark.apache.org/docs/latest/structured-streaming-kafka -integration.html#kafka-specific-configurations – himanshuIIITian
根據此[Databricks doc](https://docs.databricks.com/spark/latest/structured-streaming/kafka.html)_Since Spark 2.2,您可以選擇設置組ID。但是,請謹慎使用,因爲這可能會導致意外的行爲._ – ELI
奇怪!因爲根據Spark 2.2文檔,我們不能。可能兩個文件之間存在不匹配。 – himanshuIIITian