2017-08-29 43 views
0

我想要做的事在multiprocessing,我想推遲獲取結果,這樣的事情:如何使扭曲的延遲獲得函數結果?

from multiprocessing import Pool 
from twisted.internet import defer 
import time 

def f(x): 
    time.sleep(0.5) 
    print(x) 
    return x*x 

pool = Pool(processes=4)    # start 4 worker processes 
def get_result(i): 
    res = pool.apply_async(f, (i,)) # do work in process pool 
    return defer.Deferred(res.get()) # now, I want to make process do something else, so it should not be blocked 

def main(): 
    from twisted.internet import reactor 

    @defer.inlineCallbacks 
    def _run(): 
     for i in range(4): 
      yield get_result(i) 

     reactor.stop() 
    reactor.callLater(1, _run) 
    reactor.run() 


if __name__ == '__main__': 
    main() 

回答

0

Pool.apply_async()callback ARG,你可以利用開始在Deferred回調鏈。 catch(這是絕對需要記住的)是池回調函數將在另一個線程中執行 因此,將結果應用於Deferred時必須致電reactor.callFromThread,以便回調鏈與reactor發生在同一線程中。 不這樣做將導致回調在反應器沒有上下文的另一個線程中執行。 這裏有一個稍微修改例如:

from functools import partial 
from multiprocessing import Pool 
import threading 
import time 

from twisted.internet import defer, reactor 


def f(x): 
    time.sleep(5) 
    return x*x 

def get_result(pool, i): 
    deferred = defer.Deferred() # create a Deferred that will eventually provide a result 
    _set_result = partial(set_result, deferred=deferred) # pass the Deferred to the apply_async callback 
    pool.apply_async(f, args=(i,), callback=_set_result) # execute in a separate process, supply callback fn 
    return deferred 

def set_result(result, deferred): 
    """ 
    Set the result in the deferred 
    """ 
    print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result)) 
    reactor.callFromThread(deferred.callback, result) # execute the Deferred callback chain from the reactor thread 

def display_result(result): 
    """ 
    Just display the result 
    """ 
    print('Thread ID: %d, Display %d' % (threading.get_ident(), result)) 

def kill_reactor(null): 
    print('Thread ID: %d, Stopping reactor' % threading.get_ident()) 
    reactor.stop() 

def main(): 
    print('Thread ID: %d, Main' % threading.get_ident()) 
    pool = Pool(processes=4) 
    d = get_result(pool, 3) 
    d.addCallback(display_result) 
    d.addCallback(kill_reactor) 

    reactor.run() 

main() 


#---------- OUTPUT ----------# 
# Thread ID: 803872, Main 
# Thread ID: 533632, Setting result 9 
# Thread ID: 803872, Display 9 
# Thread ID: 803872, Stopping reactor 

我打印線程ID,這樣你可以看到set_result()確實叫在另一個線程(進程?),而不是在主線程,即反應堆線程。 在這個例子中收集reactor.callFromThread(deferred.callback, result)將導致回調在reactor.stop()無法運行的線程中執行,並且Twisted會引發嘔吐(回溯)到處! 考慮使用reactor.spawnProcess,因爲這將限制您(或我)會以其他方式犯的錯誤。 一如既往,如果你可以做任何你想做的事情,在一個單一的線程,並省略multiprocessingthreading,我建議你這樣做。