考慮TCP連接的情況。 TCP連接的接收緩衝區可以有或沒有數據。你可以得到這些數據,或者什麼也得不到,而不使用非阻塞套接字API攔截:
data = socket.recv(1024)
if data:
self.process_data(data)
可以等待數據可用使用select()
(或任何基本相當的API):
socket.setblocking(False)
while True:
data = socket.recv(1024)
if data:
self.process_data(data)
else:
select([socket], [], [])
其中,只有select()
是特別扭曲不友好的(儘管扭曲的成語是肯定不會讓自己的socket.recv
調用)。雖然(用Protocol
用dataReceived
方法代替select
的調用方式可以觸發Deferred
- 有點像你的on_data_available
方法 - 折騰一些收益率並使整個產品成爲inlineCallbacks
生成器)。
但是,儘管這是一種可以從TCP連接獲取數據的方法,但Twisted鼓勵您使用的API並不是這樣做的。相反,API是:
class SomeProtocol(Protocol):
def dataReceived(self, data):
# Your logic here
我不明白你的情況有多大的不同。如果,不是你寫的循環,你做了這樣的事情:
class YourDataProcessor(object):
def process_data(self, data):
# Your logic here
class SomeDataGetter(object):
def __init__(self, processor):
self.processor = processor
def on_available_data(self):
data = self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)
現在有根本沒有Deferreds(也許除了任何實現on_available_data
或get_data_nonblocking
,但我看不到代碼)。
如果您保持原樣,您可以保證按順序執行,因爲Twisted是單線程的(在幾個標有非常清晰標記的地方除外),並且在單線程程序中,以前的調用到process_data
必須在任何稍後致電process_data
之前完成(除了當然,process_data
重複地自行調用 - 但這是另一個故事)。
如果您切換回使用inlineCallbacks
(或任何等效的「協同」調味飲料組合),那麼您可能會引入無序執行的可能性。
例如,如果get_data_nonblocking
回報Deferred
和你寫的是這樣的:
@inlineCallbacks
def on_available_data(self):
data = yield self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)
那麼你已經改變on_available_data
說,上下文切換是允許呼叫get_data_nonblocking
時。在這種情況下,根據具體實施的get_data_nonblocking
和on_available_data
,這是完全有可能的:
on_available_data
叫
get_data_nonblocking
被調用,並返回一個Deferred
on_available_data
告訴執行切換到另一個上下文(通過yield
/inlineCallbacks
)
on_available_data
又被稱爲
get_data_nonblocking
被再次調用並返回一個Deferred
(也許是同一個!也許是一個新的!依賴於它是如何實現)
- 的
on_available_data
第二次調用告訴執行切換到另一個上下文(同樣的原因)
- 反應器打轉了一會兒,最終的事件到來,導致由秒返回
Deferred
調用get_data_nonblocking
來點燃。
- 執行切換回秒
on_available_data
框架
process_data
被稱爲與任何數據秒get_data_nonblocking
調用返回
- 最終,同樣的事情發生在第一組對象和
process_data
與再次叫什麼數據第一get_data_nonblocking
調用返回
現在也許你已經處理數據O沒有命令 - 這又取決於系統其他部分的更多細節。
如果是這樣,您可以隨時重新排列順序。有很多不同的可能的方法來做到這一點。 Twisted本身沒有附帶任何明確支持此操作的API,因此解決方案涉及編寫一些新代碼。下面是一個辦法一個想法(未經測試) - 它知道對象序列號中的類似隊列類:
class SequencedQueue(object):
"""
A queue-like type which guarantees objects come out of the queue in the order
defined by a sequence number associated with the objects when they are put into
the queue.
Application code manages sequence number assignment so that sequence numbers don't
have to have the same order as `put` calls on this type.
"""
def __init__(self):
# The sequence number of the object that should be given out
# by the next call to `get`
self._next_sequence = 0
# The sequence number of the next result that needs to be provided.
self._next_result = 0
# A holding area for objects past _next_sequence
self._queue = {}
# A holding area
self._waiting =
def put(self, sequence, object):
"""
Put an object into the queue at a particular point in the sequence.
"""
if sequence < self._next_sequence:
# Programming error. The sequence number
# of the object being put has already been used.
raise ...
self._queue[sequence] = object
self._check_waiters()
def get(self):
"""
Get an object from the queue which has the next sequence number
following whatever was previously gotten.
"""
result = self._waiters[self._next_sequence] = Deferred()
self._next_sequence += 1
self._check_waiters()
return result
def _check_waiters(self):
"""
Find any Deferreds previously given out by get calls which can now be given
their results and give them to them.
"""
while True:
seq = self._next_result
if seq in self._queue and seq in self._waiting:
self._next_result += 1
# XXX Probably a re-entrancy bug here. If a callback calls back in to
# put then this loop might run recursively
self._waiting.pop(seq).callback(self._queue.pop(seq))
else:
break
預期的行爲(模我不小心添加任何錯誤)是一樣的東西:
q = SequencedQueue()
d1 = q.get()
d2 = q.get()
# Nothing in particular happens
q.put(1, "second result")
# d1 fires with "first result" and afterwards d2 fires with "second result"
q.put(0, "first result")
使用這種方法,只要確保按照您希望分配數據的順序分配順序號,而不是按實際順序顯示的順序。例如:
@inlineCallbacks
def on_available_data(self):
sequence = self._process_order
data = yield self.get_data_nonblocking()
if data is not None:
self._process_order += 1
self.sequenced_queue.put(sequence, data)
在其他地方,一些代碼可以消耗的,如隊列排序:
@inlineCallbacks
def queue_consumer(self):
while True:
yield self.process_data(yield self.sequenced_queue.get())
不幸的是'get_data_nonblocking()'是一個第三方的庫函數返回一個延遲(即火災時,服務器響應)。 – wRAR
這並非天生不幸。這隻意味着你需要添加一些邏輯來強加排序。 –