2013-04-21 114 views
7

在我的應用程序中,我使用多處理模塊中的管道在python進程之間進行通信。 最近我觀察到一個奇怪的行爲,這取決於我通過它們發送的數據的大小。 根據python文檔,這些管道是基於連接的,並且應該以異步方式運行,但有時他們會在發送時卡住。如果我在每個連接中啓用全雙工,即使我沒有使用連接進行發送和收聽,一切正常。 任何人都可以解釋這種行爲?Python管道的同步/異步行爲

  1. 100輛彩車,全雙工禁用
    代碼工作,利用asynchronousness。
  2. 100浮動,全雙工啓用
    該示例正常工作正常。
  3. 10000浮點數,全雙工禁用
    儘管數據量較小,但執行永遠被阻止。
  4. 10000浮點數,全雙工啓用
    再次罰款。

代碼(這不是我的生產代碼,它只是說明我的意思):

from collections import deque 
from multiprocessing import Process, Pipe 
from numpy.random import randn 
from os import getpid 

PROC_NR = 4 
DATA_POINTS = 100 
# DATA_POINTS = 10000 


def arg_passer(pipe_in, pipe_out, list_): 
    my_pid = getpid() 
    print "{}: Before send".format(my_pid) 
    pipe_out.send(list_) 
    print "{}: After send, before recv".format(my_pid) 
    buf = pipe_in.recv() 
    print "{}: After recv".format(my_pid) 


if __name__ == "__main__": 
    pipes = [Pipe(False) for _ in range(PROC_NR)] 
    # pipes = [Pipe(True) for _ in range(PROC_NR)] 
    pipes_in = deque(p[0] for p in pipes) 
    pipes_out = deque(p[1] for p in pipes) 
    pipes_in.rotate(1) 
    pipes_out.rotate(-1) 

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)] 
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo])) 
       for foo in xrange(PROC_NR)] 

    for proc in processes: 
     proc.start() 

    for proc in processes: 
     proc.join() 

回答

8

首先,值得注意的multiprocessing.Pipe類的實現......

def Pipe(duplex=True): 
    ''' 
    Returns pair of connection objects at either end of a pipe 
    ''' 
    if duplex: 
     s1, s2 = socket.socketpair() 
     s1.setblocking(True) 
     s2.setblocking(True) 
     c1 = _multiprocessing.Connection(os.dup(s1.fileno())) 
     c2 = _multiprocessing.Connection(os.dup(s2.fileno())) 
     s1.close() 
     s2.close() 
    else: 
     fd1, fd2 = os.pipe() 
     c1 = _multiprocessing.Connection(fd1, writable=False) 
     c2 = _multiprocessing.Connection(fd2, readable=False) 

    return c1, c2 

區別在於半雙工'管道'使用anonymous pipe,但全雙工'管道'實際上使用Unix domain socket,因爲匿名管道本質上是單向的。

我不確定在這種情況下術語「異步」是什麼意思。如果你的意思是「非阻塞I/O」,那麼值得注意的是這兩個實現默認都使用阻塞I/O。


其次,值得注意你想發送的數據的大小醃製...

>>> from numpy.random import randn 
>>> from cPickle import dumps 
>>> len(dumps(randn(100))) 
2479 
>>> len(dumps(randn(10000))) 
237154 

第三,從pipe(7)手冊頁...

管道容量

管道容量有限。如果管道已滿,則寫入(2)將阻止 或失敗,具體取決於是否設置了O_NONBLOCK標誌(請參見下文)。對於管道容量,不同的實施方式有不同的限制。應用程序應該不依賴於特定的容量:應該設計應用程序,以便 讀取過程在數據可用時立即使用數據,以便寫入過程 不會被阻塞。

在2.6.11之前的Linux版本中,管道的容量與系統頁面大小 (例如i386上的4096字節)相同。自Linux 2.6.11以來,管道容量爲 65536字節。


所以,實際上,你已經創建,所有的子進程都擋在了pipe_out.send()呼叫的僵局,沒有人可以從其他進程接收到任何數據,因爲你發送的所有一次命中237,154個字節的數據,填充了65,536字節的緩衝區。

您可能會試圖使用Unix域套接字版本,但目前它的唯一原因是它的緩衝區大小超過管道,並且您會發現如果您增加該解決方案也會失敗DATA_POINTS的數量爲100,000。

「quick n'dirty hack」解決方案是將數據分成更小的塊進行發送,但依賴於特定大小的緩衝區並不是一個好習慣。

更好的解決方案是在調用pipe_out.send()時使用非阻塞I/O,但我對multiprocessing模塊不熟悉,無法確定使用該模塊實現它的最佳方式。

中的僞是沿行...

while 1: 
    if we have sent all data and received all data: 
     break 
    send as much data as we can without blocking 
    receive as much data as we can without blocking 
    if we didn't send or receive anything in this iteration: 
     sleep for a bit so we don't waste CPU time 
     continue 

...或者你可以使用Python select模塊,以避免睡眠時間超過是必要的,但同樣,有multiprocessing.Pipe它整合可能會很棘手。

有可能multiprocessing.Queue類爲你做這一切,但我以前從未使用它,所以你必須做一些實驗。

+0

謝謝您的詳細解答。現在我明白了這個問題。 – Michal 2013-04-29 06:25:54