我以前成功地在Kafka(0.10.1.0)之前一直使用pyspark進行Spark Streaming(Spark 2.0.2),但我的目的更適合結構化流式傳輸。我已經嘗試使用範例在線:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.htmlPyspark結構化流式傳輸卡夫卡配置錯誤
與以下類似代碼:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
我也嘗試添加:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
但是,我始終與下面的錯誤結束在此創建ds1時的選項集合:
.option("partition.assignment.strategy", "range")
但是ev明確地給它賦值並不能阻止錯誤,也沒有任何其他值(比如「roundrobin」),我可以在線或在Kafka文檔中找到它。
我也試過這個「分配」選項,並取得了相同的錯誤(我們的卡夫卡主機設置爲分配 - 每個消費者只分配一個分區,我們沒有任何重新平衡)。
任何想法這裏發生了什麼?文檔沒有幫助(可能因爲它仍處於試驗階段)。另外,有沒有使用KafkaUtils進行結構化流式傳輸?或者這是唯一的網關?