我有一個應用程序需要監聽多個不同的主題;每個主題都有獨立的消息處理邏輯。我曾經想過爲每個KafkaStreams實例使用相同的kafka屬性,但是我得到如下所示的錯誤。Kafka Streams:使用相同的`application.id`來消費多個主題
錯誤
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
代碼(科特林)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
,我發現這個reference那建議你不能使用application.id
多個主題,但我發現很難找到參考文件來支持。 documentation對於application.id
指出:
流處理應用的標識符。在Kafka集羣中必須是唯一的。它用作1)默認客戶端ID前綴,2)用於成員資格管理的組ID,3)變更日誌主題前綴。
問題
- 這個錯誤是什麼意思,什麼原因造成的。
- 鑑於您可以使用多個主題分區使用同一個ID運行的多個應用程序實例,「在Kafka集羣中必須是唯一的」是什麼意思?
- 你可以使用相同的卡夫卡流
application.id
開始兩個KafkaStreams
列在不同的主題?如果是這樣,怎麼樣?
詳情:卡夫卡0.11.0.2
謝謝。是否有任何文件聲明_「具有相同application.id的多個應用程序,它們必須與輸入主題相同」_? –
我不確定。請注意,AK支持Streams for v1。0重新atm重新atm - 評論/建議歡迎通過郵件列表。 –