2017-10-17 139 views
2

我有一個在Docker容器中運行的python「Device」。它連接到Crossbar路由器,在訂閱的頻道上接收高速公路/ WAMP事件消息。處理來自高速通道的消息異步,非阻塞訂閱

當某個事件發佈時,我的設備正在調用幾秒鐘內完成的方法。 現在,我希望它跳過或處理收到的同一事件的任何消息,而該方法仍在運行。我試圖通過使用Twisted的@inlinecallback修飾器並在設備上設置「self.busy」標誌來實現此目的。

但是它並沒有立即返回,而是像正常的阻塞方法那樣工作,以便傳入的消息被一個接一個地處理。

這裏是我的代碼:

from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 

class Pixel(ApplicationSession): 

@inlineCallbacks 
def onJoin(self, details): 
    yield self.subscribe(self.handler_no_access, 'com.event.no_access') 

@inlineCallbacks 
def handler_no_access(self, direction): 
    entries = len(self.handlers['no_access'][direction]) 

    if entries == 0: 
     self.handlers['no_access'][direction].append(direction) 
     result = yield self._handler_no_access() 
     return result 

    else: 
     yield print('handler_no_access: entries not 0: ', self.handlers['no_access']) 

@inlineCallbacks 
def _handler_no_access(self): 
    for direction in self.handlers['no_access']: 

     for message in self.handlers['no_access'][direction]: 
      yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 
      self.handlers['no_access'][direction] = [] 

我已經邁出了哈克路徑與self.handler字典,順便說一句。

EDIT

阻塞的方法是:

yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 

它控制在樹莓派的一個的GPIO Neopixel,讓它閃爍和關閉1秒。任何進一步的方法調用

def handler_no_access(self, direction) 

而_timed_switch尚未完成,應該被跳過,所以他們不堆疊。

SOLUTION

@inlineCallbacks 
def handler_no_access(self, direction): 
    direction = str(direction) 

    if self.busy[direction] is False: 

     self.busy[direction] = True 

     # non-blocking now 
     yield deferToThread(self._handler_no_access, direction) 

    else: 
     yield print('handler_no_access: direction {} busy '.format(direction)) 

def _handler_no_access(self, direction): 

    # this takes 1s to execute 
    self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 

    self.busy[direction] = False 

回答

0

inlineCallbacks不會使阻塞的代碼到非阻塞代碼。這只是一個使用Deferreds的替代API。延期只是一種管理回調的方法。

你需要重寫你的阻塞代碼,以非阻塞的方式。你實際上沒有說過你的代碼的哪一部分被阻塞,也沒有說明它阻塞了什麼,因此很難建議你如何做到這一點。將阻塞代碼變爲非阻塞的唯一兩​​種通用工具是線程和進程。所以,你可以在一個單獨的線程或進程中運行該函數。該函數可能會或可能不會在這樣的執行環境中工作(再次,沒有辦法知道它確實知道它是什麼)。

+0

恩,謝謝你的提醒,顯然我錯誤地理解了Twisted中的Deferred-mechanics。 – stk

+0

[鏈接](http://twistedmatrix.com/documents/current/core/howto/gendefer.html#what-deferreds-don-t-do-make-your-code-asynchronous) 我試圖現在使用deferToThread(f)並將報告回來,這將解決我的問題。 – stk