2016-11-18 155 views
0

我想在語法上重新創建卡夫卡主題。 我使用相同的kafka.admin.AdminUtils重複卡夫卡主題

這裏是我的rought代碼:

AdminUtils.deleteTopic(zkUtils, topicName); 
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties()); 

上面的代碼是工作的大部分時間,但多次失敗與以下情況除外:

Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists. 
     at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253) 
     at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237) 
     at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) 

我的理解是,主題不正確刪除。我在這裏做錯了什麼。

回答

2

在調用deletedTopic之後,您不能調用createTopic。有兩件事情必須照顧:

  1. 設置「delete.topic.enable」真

  2. 由於刪除主題異步操作,你最好確保所有元數據已被成功刪除在創建新話題之前從Zookeeper中刪除

+0

我已經將delete.topic.enable設置爲true。爲了達到第2點,是否有任何來自kafka的回調,當刪除操作成功完成時通知它? –

+0

似乎沒有開箱即用的回調功能可以使用。一種可能的方法是監視zk節點'/ admin/delete_topics/'的存在。刪除這個zk節點是刪除主題過程中的最後一步,在清除不容易被監控的控制器緩存之前發生該主題。 – amethystic