2017-07-28 13 views
3

我正在使用使用websockets的Django和Django頻道構建web應用程序。從另一個消費者在一個Django頻道消費者中斷開循環

當用戶點擊瀏覽器上的按鈕,可將WebSocket將數據發送到我的服務器和客戶服務器上啓動每秒消息發送到客戶端一次(環路)。

我想創建另一個按鈕,將停止該數據發送過程。當用戶點擊這個新按鈕時,websocket會向服務器發送另一個數據,並且服務器上的使用者必須以某種方式停止之前使用者的循環。此外,我會要求這停止客戶端斷開連接時的循環。

我覺得想要使用全局變量。但是Django Channels文檔聲明,他們強烈建議不要使用全局變量,因爲他們希望保持應用程序網絡的透明性(不太瞭解這一點)。

我試過使用通道會話。我讓第二個消費者更新頻道會話中的值,但頻道會話值沒有在第一個消費者中更新。

這是簡化的代碼。 瀏覽器:在服務器

var socket = new WebSocket("ws://" + window.location.host + "/socket/"); 
$('#button1').on('click', function() { 
    socket.send(JSON.stringify({action: 'start_getting_values'})) 
}); 
$('#button2').on('click', function() { 
    socket.send(JSON.stringify({action: 'stop_getting_values'})) 
}); 

消費者:

@channel_session 
def ws_message(message): 
    text = json.loads(message.content['text']) 

    if text['action'] == 'start_getting_values': 
     while True: 
      # Getting some data here 
      # ... 
      message.reply_channel.send({"text": some_data}, immediately=True) 
      time.sleep(1) 

    if text['action'] == 'stop_getting_values': 
     do_something_to_stop_the_loop_above() 
+0

你找到一個解決這個問題? – Vingtoft

+0

@Vingtoft是的,請檢查我的答案。 –

回答

3

好吧,我設法解決這個任務後,我自己我接觸Django的渠道開發。

使內消費者循環的做法是不好的,因爲它會阻止該網站一旦消費者將要運行的次數等於相當於運行此消費的所有工人的線程數量。

所以我的方法如下:一旦消費者獲得'start_getting_values'信號,它將當前的回覆通道添加到組中,並在連接到的Redis服務器上增加值(我使用Redis作爲通道層後端,但它將工作在任何其他後端)。

它增加了什麼值?在Redis上,我有一個關鍵字,表示散列對象類型的「組」。該鍵的每個鍵表示通道中的一個組,而值表示該組中回覆通道的數量。

然後我創建,我連接到相同的Redis服務器一個新的Python文件。在這個文件中,我運行無限循環,從Redis的關鍵'組'中加載字典。然後我在這個詞典中循環每個鍵(每個鍵代表通道組的名稱)並將數據廣播到具有非零值的每個組。當我運行這個文件時,它作爲單獨的進程運行,因此不會阻止消費者的任何事情。

要停止向用戶播放,當我從他那裏得到適當的信號時,我只是將他從組中刪除,並減少相應的Redis值。

消費者代碼:

import redis 

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0) 

@channel_session_user 
def ws_message(message): 

    text = json.loads(message.content['text']) 

    if text['stream'] == 'start_getting_values': 
     value_id = text['value_id'] 
     redis_client.hincrby('redis_some_key', value_id, 1) 
     Group(value_id).add(message.reply_channel) 
     channel_session['value_id'] = value_id 
     return 0 

    if text['stream'] == 'stop_getting_values': 
     if message.channel_session['value_id'] != '': 
      value_id = message.channel_session['value_id'] 
      Group(value_id).discard(message.reply_channel) 

      l = redis_client.lock(name='del_lock') 
      val = redis_client.hincrby('redis_some_key', value_id, -1) 
      if (val <= 0): 
       redis_client.hdel('redis_some_key', value_id) 
      l.release() 
     return 0 

單獨的Python文件:

import time 
import redis 
from threading import Thread 
import asgi_redis 


redis_client = redis.StrictRedis(host='localhost', port=6379, db=0) 
cl = asgi_redis.RedisChannelLayer() 

def some_action(value_id): 

    # getting some data based on value_id 
    # .... 

    cl.send_group(value_id, { 
     "text": some_data, 
    }) 


while True: 
    value_ids = redis_client.hgetall('redis_some_key') 

    ths = [] 
    for b_value_id in value_ids.keys(): 
     value_id = b_value_id.decode("utf-8") 
     ths.append(Thread(target=some_action, args=(value_id,))) 

    for th in ths: 
     th.start() 
    for th in ths: 
     th.join() 


    time.sleep(1) 
+0

感謝您的解釋!你有任何鏈接到你使用的資源或代碼示例?謝謝! – Vingtoft

+1

@Vingtoft不客氣。添加了代碼片段。 –

相關問題