0

我寫一個星火結構流應用Pyspark不允許從卡夫卡讀取數據。星火流:卡夫卡組ID星火結構化流

但是,Spark的當前版本是2.1.0,它不允許我將group id設置爲參數,並會爲每個查詢生成唯一的id。但卡夫卡連接是基於組的授權,需要預設的組標識。

因此,是否有任何解決方法來建立連接而不需要更新Spark到2.2,因爲我的團隊不需要它。

我的代碼:

if __name__ == "__main__": 
    spark = SparkSession.builder.appName("DNS").getOrCreate() 
    sc = spark.sparkContext 
    sc.setLogLevel("WARN") 

    # Subscribe to 1 topic 
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load() 
    print(lines.isStreaming) #print TRUE 
    lines.selectExpr("CAST(value AS STRING)") 
    # Split the lines into words 
    words = lines.select(
    explode(
     split(lines.value, " ") 
     ).alias("word") 
    ) 
    # Generate running word count 
    wordCounts = words.groupBy("word").count() 

    # Start running the query that prints the running counts to the console 
    query = wordCounts \ 
     .writeStream \ 
     .outputMode("complete") \ 
     .format("console") \ 
     .start() 

    query.awaitTermination() 
+0

我不認爲你可以在Spark 2.2中設置'group.id' - http://spark.apache.org/docs/latest/structured-streaming-kafka -integration.html#kafka-specific-configurations – himanshuIIITian

+0

根據此[Databricks doc](https://docs.databricks.com/spark/latest/structured-streaming/kafka.html)_Since Spark 2.2,您可以選擇設置組ID。但是,請謹慎使用,因爲這可能會導致意外的行爲._ – ELI

+0

奇怪!因爲根據Spark 2.2文檔,我們不能。可能兩個文件之間存在不匹配。 – himanshuIIITian

回答

0

KafkaUtils類將覆蓋參數值"group.id"。它將從原始組ID中接收"spark-executor-"

下面是KafkaUtils其中這樣的代碼:

// driver and executor should be in different consumer groups 
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) 
    if (null == originalGroupId) { 
     logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it") 
    } 
    val groupId = "spark-executor-" + originalGroupId 
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") 
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) 

我們面臨着同樣的問題。 Kafka基於帶預設組ID的ACL,因此唯一的辦法是在kafka配置中更改組ID。我們的原始團隊ID的insead我們把"spark-executor-" + originalGroupId

+0

我正在使用Spark結構化流(上面的代碼),它直接從kafka讀取流數據而不創建流上下文 – ELI

相關問題