0

我以前成功地在Kafka(0.10.1.0)之前一直使用pyspark進行Spark Streaming(Spark 2.0.2),但我的目的更適合結構化流式傳輸。我已經嘗試使用範例在線:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.htmlPyspark結構化流式傳輸卡夫卡配置錯誤

與以下類似代碼:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value 

我也嘗試添加:

ds1 = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1") 
    .load() 
query = ds1 
    .writeStream 
    .outputMode('append') 
    .format('console') 
    .start() 
query.awaitTermination() 

但是,我始終與下面的錯誤結束在此創建ds1時的選項集合:

.option("partition.assignment.strategy", "range") 

但是ev明確地給它賦值並不能阻止錯誤,也沒有任何其他值(比如「roundrobin」),我可以在線或在Kafka文檔中找到它。

我也試過這個「分配」選項,並取得了相同的錯誤(我們的卡夫卡主機設置爲分配 - 每個消費者只分配一個分區,我們沒有任何重新平衡)。

任何想法這裏發生了什麼?文檔沒有幫助(可能因爲它仍處於試驗階段)。另外,有沒有使用KafkaUtils進行結構化流式傳輸?或者這是唯一的網關?

回答

1
  1. 有卡夫卡0.10.1一個已知的問題。*的客戶,你不應該用星火使用它,因爲它可能會產生由於https://issues.apache.org/jira/browse/KAFKA-4547錯誤的答案。您可以使用0.10.0.1客戶端,並且它應該可以與0.10.1。* Kafka集羣一起使用。

  2. 要將卡夫卡配置發送到結構化數據流中的卡夫卡客戶端,您需要添加kafka.前綴,如.option("kafka.partition.assignment.strategy", "range")。但是,您不需要設置kafka.partition.assignment.strategy,因爲它具有默認值。我的直覺是你可能把兩個Kafka 0.8。*和0.10。* jar放在classpath中,並加載錯誤的類。

  3. 您想使用KafkaUtils中的哪個API,但在結構化數據流中缺失? Spark 2.2.0剛剛出爐,您可以在結構化流式處理中使用批處理或流式查詢與Kafka。例如,請閱讀http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

0

kafka-clients-*.jar添加到您的spark jar文件夾,然後重新啓動spark master和slave。那麼你不需要添加.option("partition.assignment.strategy", "range")