2017-12-27 490 views
0

我有一個應用程序需要監聽多個不同的主題;每個主題都有獨立的消息處理邏輯。我曾經想過爲每個KafkaStreams實例使用相同的kafka屬性,但是我得到如下所示的錯誤。Kafka Streams:使用相同的`application.id`來消費多個主題

錯誤

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic 

代碼(科特林)

class KafkaSetup() { 
    companion object { 
     private val LOG = LoggerFactory.getLogger(this::class.java) 
    } 

    fun getProperties(): Properties { 
     val properties = Properties() 
     properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app") 
     return properties 
    } 

    private fun listenOnMyTopic() { 
     val kStreamBuilder = KStreamBuilder() 
     val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic") 

     kStream.foreach { key, value -> LOG.info("do stuff") } 

     val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) 
     kafkaStreams.start() 
    } 

    private fun listenOnMyOtherTopic() { 
     val kStreamBuilder = KStreamBuilder() 
     val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic") 

     kStream.foreach { key, value -> LOG.info("do other stuff") } 

     val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) 
     kafkaStreams.start() 
    } 
} 

,我發現這個reference那建議你不能使用application.id多個主題,但我發現很難找到參考文件來支持。 documentation對於application.id指出:

流處理應用的標識符。在Kafka集羣中必須是唯一的。它用作1)默認客戶端ID前綴,2)用於成員資格管理的組ID,3)變更日誌主題前綴。

問題

  1. 這個錯誤是什麼意思,什麼原因造成的。
  2. 鑑於您可以使用多個主題分區使用同一個ID運行的多個應用程序實例,「在Kafka集羣中必須是唯一的」是什麼意思?
  3. 你可以使用相同的卡夫卡流application.id開始兩個KafkaStreams列在不同的主題?如果是這樣,怎麼樣?

詳情:卡夫卡0.11.0.2

回答

2

卡夫卡流通過分區,而不是主題縮放。因此,如果您使用相同的application.id啓動多個應用程序,則它們必須與它們所訂閱的輸入主題及其處理邏輯相同。該應用程序使用application.id作爲group.id形成消費者組,因此輸入主題的不同分區被分配給不同的實例。

如果您有與相同邏輯不同的主題,你可以訂閱所有話題一次(在每個實例啓動)。雖然縮放仍然基於分區。 (它基本上是輸入主題的「合併」。)

如果要通過主題縮放和/或具有不同的處理邏輯,則必須針對不同的Kafka Streams應用程序使用不同的application.id

+0

謝謝。是否有任何文件聲明_「具有相同application.id的多個應用程序,它們必須與輸入主題相同」_? –

+0

我不確定。請注意,AK支持Streams for v1。0重新atm重新atm - 評論/建議歡迎通過郵件列表。 –

相關問題