2011-09-06 19 views
5

我有一個GUI應用程序需要從GUI主循環旁邊的 網絡中獲取和分析不同的資源。 我已經使用python多處理模塊搜索選項,因爲這些 讀取操作不僅包含阻塞IO,而且包含繁重的解析,所以 多處理可能比python線程更好。 使用Twisted會很容易,但是這次Twisted不是一個選項。使用多處理的GUI應用程序內的異步調用

我發現了一個簡單的解決方案在這裏:

Python subprocess: callback when cmd exits

的問題是,回調神奇的 MainThread內不叫。

於是我想出了以下解決方案:

delegate.py

import os 
import multiprocessing as mp 
import signal 
from collections import namedtuple 
import uuid 
import logging 


_CALLBACKS = {} 
_QUEUE = mp.Queue() 

info = logging.getLogger(__name__).info 


class Call(namedtuple('Call', 'id finished result error')): 

    def attach(self, func): 
     if not self.finished: 
      _CALLBACKS.setdefault(self.id, []).append(func) 
     else: 
      func(self.result or self.error) 

     return self 

    def callback(self): 
     assert self.finished, 'Call not finished yet' 
     r = self.result or self.error 
     for func in _CALLBACKS.pop(self.id, []): 
      func(r) 

    def done(self, result=None, error=None): 
     assert not self.finished, 'Call already finished' 
     return self._replace(finished=(-1 if error else 1), 
      result=result, error=error) 

    @classmethod 
    def create(clss): 
     call = clss(uuid.uuid4().hex, 0, None, None) # uuid ??? 
     return call 

def run(q, cb, func, args=None, kwargs=None): 
    info('run: try running %s' % func) 
    try: 
     cb = cb.done(result=func(*(args or()), **(kwargs or {}))) 
    except Exception, err: 
     cb = cb.done(error=err) 
    q.put(cb) 
    os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ??? 
    info('run: leaving') 

def on_callback(sig, frame): 
    info('on_callback: checking queue ...') 
    c = _QUEUE.get(True, 2) 
    info('on_callback: got call - %s' % repr(c)) 
    c.callback() 

signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ??? 

def delegate(func, *args, **kwargs): 
    info('delegate: %s %s' % (func, args,)) 
    cb = Call.create() 
    mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start() 
    return cb 


__all__ = ['delegate'] 

使用

from delegate import delegate 

def sleeper(secs): 
    assert secs >= 1, 'I need my Augenpflege' 
    info('sleeper: will go to sleep for %s secs' % secs) 
    sleep(secs) 
    info('sleeper: woke up - returning result') 
    return ['sleeper', 'result'] 

def on_sleeper_result(r): 
    if isinstance(r, Exception): 
     info('on_sleeper_result: got error: %s' % r) 
    else: 
     info('on_sleeper_result: got result: %s' % r) 

from delegate import delegate 
delegate(sleeper, 3).attach(on_sleeper_result) 
delegate(sleeper, -3).attach(on_sleeper_result) 
while 1: 
    info('main: loop') 
    sleep(1) 

輸出

0122 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (3,) 
MainThread INFO delegate: <function sleeper at 0x163e320> (-3,) 
0124 08437 MainThread INFO run: try running <function sleeper at 0x163e320> 
0124 08437 MainThread INFO sleeper: will go to sleep for 3 secs 
0124 08432 MainThread INFO main: loop 
0125 08438 MainThread INFO run: try running <function sleeper at 0x163e320> 
0126 08438 MainThread INFO run: leaving 
0126 08432 MainThread INFO on_callback: checking queue ... 
0126 08432 MainThread INFO on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',)) 
0127 08432 MainThread INFO on_sleeper_result: got error: I need my Augenpflege 
0127 08432 MainThread INFO main: loop 
1128 08432 MainThread INFO main: loop 
2129 08432 MainThread INFO main: loop 
3127 08437 MainThread INFO sleeper: woke up - returning result 
3128 08437 MainThread INFO run: leaving 
3128 08432 MainThread INFO on_callback: checking queue ... 
3129 08432 MainThread INFO on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None) 
3129 08432 MainThread INFO on_sleeper_result: got result: ['sleeper', 'result'] 
3129 08432 MainThread INFO main: loop 
4130 08432 MainThread INFO main: loop 
5132 08432 MainThread INFO main: loop 
... 

到目前爲止,這工作得很好,但我的經驗與多處理 模塊是溫和的,我有點不確定這是否會運行沒有影響。我的 的問題是 - 使用 多處理時,我應該特別關心哪些事情......或者使用Python標準庫的異步回調機制是否存在「更正確」模式?

+0

禮Bendersky寫的東西有關:[鏈接](http://eli.thegreenplace.net/2011/05/26/code-sample-socket-client-based-on-twisted-with-pyqt /) – JBernardo

+0

我之前閱讀過這篇博客文章,但正如我所提到的,Twisted不是這個項目的一個選項 - 無論如何感謝 – hooblei

回答

2

對於python多處理和主循環中的繁忙等待,您沒有理由使用信號(低級別api)。

你必須在QThread運行(修改)事件循環,可以直接調用Qt代碼,或使用QApplication.postEvent(或pyqtSignal)在主線程來執行它

# this should be in the delegate module 
while 1: 
    c = _QUEUE.get(True) # no timeout 
    c.callback() # or post event to main thread 

你可以另請參閱this page討論qt中線程之間的通信問題

+0

Aaah - 就是這樣 - 我將所有deleget代碼塞入QThread(或更好的QRunnable + QThreadPool)像一個魅力 - 謝謝 – hooblei

1

您的代碼有效,但並不像它那麼簡單。我們來看看代碼。

這主要過程將創建一個Call實例:

def delegate(func, *args, **kwargs): 
    cb = Call.create() 

但是當你通過cb的工作進程,

mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start() 

Call實例在os.fork ING過程中複製,從而創造第二個獨立的實例。然後,它調用 和cb.done這就要求cb._replace返回第三Call實例:以上

def done(self, result=None, error=None): 
    assert not self.finished, 'Call already finished' 
    return self._replace(finished=(-1 if error else 1), 
     result=result, error=error) 

調用私有namedtuple方法_replace。這本來是一個簡單的 Python語句像

self.finished = -1 if error else 1 

如果Call人的object子類來代替的namedtuple一個子類。子類namedtuple保存了一下打字的__init__但它變得相當笨拙以後,因爲我們需要修改namedtuple的屬性...

同時,delegate(...)的主要工序返回原來的Call實例調用attach

delegate(...).attach(on_sleeper_result) 

這將修改全球_CALLBACKS字典。工人進程無法知道_CALLBACKS中的這一更改;在工作進程_CALLBACKS仍然是空的字典。因此,必須通過mp.Queue將工作人員Call實例傳回主進程,該進程使用cb.id來引用_CALLBACKS中的正確功能。

所以它所有的作品,但它每次調用delegate創建三個Call情況下,代碼可能會誤導外行以爲三個Call情況下都是相同的對象....這是可行的,但它是有點複雜。

您是否考慮過使用mp.Pool.apply_asynccallback參數?

import multiprocessing as mp 
import logging 
import time 
import collections 

_CALLBACKS=collections.defaultdict(list) 

logger=mp.log_to_stderr(logging.DEBUG) 

def attach(name,func): 
    _CALLBACKS[name].append(func) 

def delegate(func, *args, **kwargs): 
    id=kwargs.pop('id') 
    try: 
     result=func(*args,**kwargs) 
    except Exception, err: 
     result=err 
    return (id,result) 

def sleeper(secs): 
    assert secs >= 1, 'I need my Augenpflege' 
    logger.info('sleeper: will go to sleep for %s secs' % secs) 
    time.sleep(secs) 
    logger.info('sleeper: woke up - returning result') 
    return ['sleeper', 'result'] 

def callback(r): 
    id,result=r 
    for func in _CALLBACKS[id]: 
     func(result) 

def on_sleeper_result(r): 
    if isinstance(r, Exception): 
     logger.error('on_sleeper_result: got error: %s' % r) 
    else: 
     logger.info('on_sleeper_result: got result: %s' % r) 

if __name__=='__main__': 
    pool=mp.Pool() 
    pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1}, 
        callback=callback) 
    attach(1,on_sleeper_result) 
    pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2}, 
        callback=callback) 
    attach(2,on_sleeper_result)  
    while 1: 
     logger.info('main: loop') 
     time.sleep(1) 
+0

嗨,我完全知道多個呼叫實例,而在 這兩個進程和_replace調用之間傳遞,但爲了可讀性考慮,因爲 這些委託操作在應用程序內相對較少。 是apply_async是我第一次嘗試,但正如我所提到的 - 回調處理程序是 未在當前主線程中調用。你也可以在這裏看到同樣的行爲: http://stackoverflow.com/questions/2581817/python-subprocess-callback-when-cmd-exits/5209746#5209746 但是,謝謝你的詳細澄清。 – hooblei

相關問題