Pool.apply_async()
有callback
ARG,你可以利用開始在Deferred
回調鏈。 catch(這是絕對需要記住的)是池回調函數將在另一個線程中執行! 因此,將結果應用於Deferred
時必須致電reactor.callFromThread
,以便回調鏈與reactor
發生在同一線程中。 不這樣做將導致回調在反應器沒有上下文的另一個線程中執行。 這裏有一個稍微修改例如:
from functools import partial
from multiprocessing import Pool
import threading
import time
from twisted.internet import defer, reactor
def f(x):
time.sleep(5)
return x*x
def get_result(pool, i):
deferred = defer.Deferred() # create a Deferred that will eventually provide a result
_set_result = partial(set_result, deferred=deferred) # pass the Deferred to the apply_async callback
pool.apply_async(f, args=(i,), callback=_set_result) # execute in a separate process, supply callback fn
return deferred
def set_result(result, deferred):
"""
Set the result in the deferred
"""
print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result))
reactor.callFromThread(deferred.callback, result) # execute the Deferred callback chain from the reactor thread
def display_result(result):
"""
Just display the result
"""
print('Thread ID: %d, Display %d' % (threading.get_ident(), result))
def kill_reactor(null):
print('Thread ID: %d, Stopping reactor' % threading.get_ident())
reactor.stop()
def main():
print('Thread ID: %d, Main' % threading.get_ident())
pool = Pool(processes=4)
d = get_result(pool, 3)
d.addCallback(display_result)
d.addCallback(kill_reactor)
reactor.run()
main()
#---------- OUTPUT ----------#
# Thread ID: 803872, Main
# Thread ID: 533632, Setting result 9
# Thread ID: 803872, Display 9
# Thread ID: 803872, Stopping reactor
我打印線程ID,這樣你可以看到set_result()
確實叫在另一個線程(進程?),而不是在主線程,即反應堆線程。 在這個例子中收集reactor.callFromThread(deferred.callback, result)
將導致回調在reactor.stop()
無法運行的線程中執行,並且Twisted會引發嘔吐(回溯)到處! 考慮使用reactor.spawnProcess
,因爲這將限制您(或我)會以其他方式犯的錯誤。 一如既往,如果你可以做任何你想做的事情,在一個單一的線程,並省略multiprocessing
或threading
,我建議你這樣做。