0
我想在語法上重新創建卡夫卡主題。 我使用相同的kafka.admin.AdminUtils
。重複卡夫卡主題
這裏是我的rought代碼:
AdminUtils.deleteTopic(zkUtils, topicName);
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties());
上面的代碼是工作的大部分時間,但多次失敗與以下情況除外:
Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
我的理解是,主題不正確刪除。我在這裏做錯了什麼。
我已經將delete.topic.enable設置爲true。爲了達到第2點,是否有任何來自kafka的回調,當刪除操作成功完成時通知它? –
似乎沒有開箱即用的回調功能可以使用。一種可能的方法是監視zk節點'/ admin/delete_topics/'的存在。刪除這個zk節點是刪除主題過程中的最後一步,在清除不容易被監控的控制器緩存之前發生該主題。 –
amethystic