2017-06-22 31 views
0

我正在嘗試對使用Channels和Websockets的Django應用進行端到端測試。 在我的情況下,請求觸發複雜的背景計算,我想觀察它的影響。 據我所知,在Django測試套件中,沒有辦法運行worker,這將消耗Django Channels的後臺任務。端到端測試Django頻道後臺任務

是否有任何方式觸發,但具有正確的參數後臺任務?我是否必須修改我的代碼才能使其可測試?

以下是簡化骨架我的設置:

views.py

def calculation(request): 
    # doing some simple calculation depending on request 
    simple_result, parameter = simple_calculation(request) 
    # trigger complex asynchronous calculation depending on parameter 
    Channel('background-calculations').send({ 
     "parameter": parameter, 
    }) 
    # return results of simple calculation 
    return render(request, 'simple_reponse.html',{'simple_result': simple_result}) 

routing.py

channel_routing = [ 
    route("background-calculations", run_background_calculations), 
    route("websocket.connect", ws_connect), 
    route("websocket.receive", ws_message), 
] 

consumers.py

def run_background_calculations(message): 
    # perform complex calculation depending on parameter from simple calculation 
    result = complex_calculation(message) 
    # update frontend via websocket 
    Group("frontend-updates").send({ 
     "text": json.dumps({ 
      "result": result, 
     }) 
    }) 

@channel_session 
def ws_connect(message): 
    message.reply_channel.send({"accept": True}) 
    Group("frontend-updates").add(message.reply_channel) 


@channel_session_user 
def ws_message(message): 
    message.reply_channel.send({ 
     "text": message.content['text'], 
    }) 

當我嘗試通過clie訪問視圖nt(例如REST APIClient)我收到響應,但後臺任務從未執行。有沒有辦法測試整個鏈?

回答

1

所以我最終弄明白了。

如果有人發現了這個問題,在這裏是如何執行的端至端測試:

from channels.test import Client 
from rest_framework.test import APIClient 

# setup 
rest = APIClient() 
channelclient = Client() 
Group("frontend-updates").add(u"test-channel") 
rest.login(username="username",password="password") 

class CopyTestCase(ChannelTestCase): 
    def test_end_to_end(self): 
     # trigger HTTP POST request 
     response = rest.post("/calculate/") 
     #...check response... 

     # consume the background-calculation task (has to be done manually) 
     channelclient.consume(u"background-calculations") 

     # check data sent to the front-end as a result of this calculation 
     message = testcase.get_next_message(u"test-channel") 
     #...check message...