2017-09-13 40 views
0

的HomeController如下,如何使用卡夫卡網C#創建新的話題

#region Properties 

const string topic = "AnotherTestTopic"; 
const string host = "http://localhost:9092"; 

#endregion 


[HttpPost] 
public ActionResult Save(FormCollection form) 
{ 
    var kafkaOptions = new KafkaOptions(new Uri(host)); 

    var brokerRouter = new BrokerRouter(kafkaOptions); 

    var producer = new Producer(brokerRouter); 

    producer.SendMessageAsync(topic, new[] { new Message("Test message") }).Wait(); 

    return RedirectToAction("Index", "Home"); 
} 

我使用卡夫卡網的dll和我SendMessageAsync方法如下

public async Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumerable<Message> messages, Int16 acks = 1, 
      TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone) 
     { 
      if (_stopToken.IsCancellationRequested) 
       throw new ObjectDisposedException("Cannot send new documents as producer is disposing."); 
      if (timeout == null) timeout = TimeSpan.FromMilliseconds(DefaultAckTimeoutMS); 

      var batch = messages.Select(message => new TopicMessage 
      { 
       Acks = acks, 
       Codec = codec, 
       Timeout = timeout.Value, 
       Topic = topic, 
       Message = message 
      }).ToList(); 

      _asyncCollection.AddRange(batch); 

      await Task.WhenAll(batch.Select(x => x.Tcs.Task)); 

      return batch.Select(topicMessage => topicMessage.Tcs.Task.Result) 
           .Distinct() 
           .ToList(); 
     } 

問題:

我剛剛起步我真的不知道如何從C#代碼創建主題。如何從C#添加主題?

任何幫助將不勝感激。

謝謝。

回答

1

您可以在您的代理配置中設置auto.create.topics.enable=true,讓卡夫卡在以前未創建時爲您創建主題。

您可能還希望在代理配置中將num.partitionsdefault.replication.factor設置爲適當的值。

+0

配置文件在哪裏? –

+0

它在您的經紀人的conf目錄中;通常稱爲「server.properties」 – alirabiee