2014-12-25 145 views
1

我想讀取和處理來自外部服務的一些數據。我詢問服務是否有任何數據,如果返回了一些信息,我會處理它並再次詢問(這樣數據可以在可用時立即處理),否則我會等待數據可用的通知。這可以寫成一個無限循環:在循環中扭曲等待事件

def loop(self): 
    while True: 
     data = yield self.get_data_nonblocking() 
     if data is not None: 
      yield self.process_data(data) 
     else: 
      yield self.data_available 

def on_data_available(self): 
    self.data_available.fire() 

如何data_available在這裏實現?它可能是延遲,但延遲不能重置,只能重新創建。有更好的選擇嗎?

此循環是否可以集成到Twisted事件循環中?我可以在on_data_available中讀取和處理數據,並編寫一些代碼,而不是循環檢查get_data_nonblocking,但我覺得我需要一些鎖以確保數據按照到達的順序進行處理(上面的代碼強制執行它,因爲它是它處理的唯一地方)。這是一個好主意嗎?

回答

3

考慮TCP連接的情況。 TCP連接的接收緩衝區可以有或沒有數據。你可以得到這些數據,或者什麼也得不到,而不使用非阻塞套接字API攔截:

data = socket.recv(1024) 
if data: 
    self.process_data(data) 

可以等待數據可用使用select()(或任何基本相當的API):

socket.setblocking(False) 
while True: 
    data = socket.recv(1024) 
    if data: 
     self.process_data(data) 
    else: 
     select([socket], [], []) 

其中,只有select()特別扭曲不友好的(儘管扭曲的成語是肯定不會讓自己的socket.recv調用)。雖然(用ProtocoldataReceived方法代替select的調用方式可以觸發Deferred - 有點像你的on_data_available方法 - 折騰一些收益率並使整個產品成爲inlineCallbacks生成器)。

但是,儘管這是一種可以從TCP連接獲取數據的方法,但Twisted鼓勵您使用的API並不是這樣做的。相反,API是:

class SomeProtocol(Protocol): 
    def dataReceived(self, data): 
     # Your logic here 

我不明白你的情況有多大的不同。如果,不是你寫的循環,你做了這樣的事情:

class YourDataProcessor(object): 
    def process_data(self, data): 
     # Your logic here 

class SomeDataGetter(object): 
    def __init__(self, processor): 
     self.processor = processor 

    def on_available_data(self): 
     data = self.get_data_nonblocking() 
     if data is not None: 
      self.processor.process_data(data) 

現在有根本沒有Deferreds(也許除了任何實現on_available_dataget_data_nonblocking,但我看不到代碼)。

如果您保持原樣,您可以保證按順序執行,因爲Twisted是單線程的(在幾個標有非常清晰標記的地方除外),並且在單線程程序中,以前的調用到process_data必須在任何稍後致電process_data之前完成(除了當然,process_data重複地自行調用 - 但這是另一個故事)。

如果您切換回使用inlineCallbacks(或任何等效的「協同」調味飲料組合),那麼您可能會引入無序執行的可能性。

例如,如果get_data_nonblocking回報Deferred和你寫的是這樣的:

@inlineCallbacks 
    def on_available_data(self): 
     data = yield self.get_data_nonblocking() 
     if data is not None: 
      self.processor.process_data(data) 

那麼你已經改變on_available_data說,上下文切換是允許呼叫get_data_nonblocking時。在這種情況下,根據具體實施的get_data_nonblockingon_available_data,這是完全有可能的:

  1. on_available_data
  2. get_data_nonblocking被調用,並返回一個Deferred
  3. on_available_data告訴執行切換到另一個上下文(通過yield/inlineCallbacks
  4. on_available_data又被稱爲
  5. get_data_nonblocking被再次調用並返回一個Deferred(也許是同一個!也許是一個新的!依賴於它是如何實現)
  6. on_available_data第二次調用告訴執行切換到另一個上下文(同樣的原因)
  7. 反應器打轉了一會兒,最終的事件到來,導致由返回Deferred調用get_data_nonblocking來點燃。
  8. 執行切換回on_available_data框架
  9. process_data被稱爲與任何數據get_data_nonblocking調用返回
  10. 最終,同樣的事情發生在第一組對象和process_data與再次叫什麼數據第一get_data_nonblocking調用返回

現在也許你已經處理數據O沒有命令 - 這又取決於系統其他部分的更多細節。

如果是這樣,您可以隨時重新排列順序。有很多不同的可能的方法來做到這一點。 Twisted本身沒有附帶任何明確支持此操作的API,因此解決方案涉及編寫一些新代碼。下面是一個辦法一個想法(未經測試) - 它知道對象序列號中的類似隊列類:

class SequencedQueue(object): 
    """ 
    A queue-like type which guarantees objects come out of the queue in the order 
    defined by a sequence number associated with the objects when they are put into 
    the queue. 

    Application code manages sequence number assignment so that sequence numbers don't 
    have to have the same order as `put` calls on this type. 
    """ 
    def __init__(self): 
     # The sequence number of the object that should be given out 
     # by the next call to `get` 
     self._next_sequence = 0 

     # The sequence number of the next result that needs to be provided. 
     self._next_result = 0 

     # A holding area for objects past _next_sequence 
     self._queue = {} 

     # A holding area 
     self._waiting = 

    def put(self, sequence, object): 
     """ 
     Put an object into the queue at a particular point in the sequence. 
     """ 
     if sequence < self._next_sequence: 
      # Programming error. The sequence number 
      # of the object being put has already been used. 
      raise ... 

     self._queue[sequence] = object 
     self._check_waiters() 

    def get(self): 
     """ 
     Get an object from the queue which has the next sequence number 
     following whatever was previously gotten. 
     """ 
     result = self._waiters[self._next_sequence] = Deferred() 
     self._next_sequence += 1 
     self._check_waiters() 
     return result 

    def _check_waiters(self): 
     """ 
     Find any Deferreds previously given out by get calls which can now be given 
     their results and give them to them. 
     """ 
     while True: 
      seq = self._next_result 
      if seq in self._queue and seq in self._waiting: 
       self._next_result += 1 
       # XXX Probably a re-entrancy bug here. If a callback calls back in to 
       # put then this loop might run recursively 
       self._waiting.pop(seq).callback(self._queue.pop(seq)) 
      else: 
       break 

預期的行爲(模我不小心添加任何錯誤)是一樣的東西:

q = SequencedQueue() 
d1 = q.get() 
d2 = q.get() 
# Nothing in particular happens 
q.put(1, "second result") 
# d1 fires with "first result" and afterwards d2 fires with "second result" 
q.put(0, "first result") 

使用這種方法,只要確保按照您希望分配數據的順序分配順序號,而不是按實際順序顯示的順序。例如:

@inlineCallbacks 
    def on_available_data(self): 
     sequence = self._process_order 
     data = yield self.get_data_nonblocking() 
     if data is not None: 
      self._process_order += 1 
      self.sequenced_queue.put(sequence, data) 

在其他地方,一些代碼可以消耗的,如隊列排序:

@inlineCallbacks 
def queue_consumer(self): 
    while True: 
     yield self.process_data(yield self.sequenced_queue.get()) 
+0

不幸的是'get_data_nonblocking()'是一個第三方的庫函數返回一個延遲(即火災時,服務器響應)。 – wRAR

+0

這並非天生不幸。這隻意味着你需要添加一些邏輯來強加排序。 –