2016-07-01 63 views
0

我想寫一些測試,其中我產生一個消息到一個隊列,看看消息是否得到正確使用和應用程序處理。如何正確使用內存中的傳輸進行單元測試

爲此,我正在玩kombu庫,特別是內存中的Transport實現。

仍然我無法得到它的工作,生產的消息被消耗。因此

我的問題是,如果有人能提供生產和內存消耗的消息簡單的單元測試

回答

2

你會想這個適應你想測試代碼,但基本你正在尋找的是內存控制器的amqp URI,它是'memory://'。作爲一個非常簡單的例子:

conn = kombu.Connection("memory://") 
queue = conn.SimpleQueue('myqueue') 
queue.put('test') 
msg = queue.get(timeout=1) 
msg.ack() 
print(msg.payload) 
相關問題