2017-08-29 190 views
2

我想運行一個並行循環(免責聲明:我是新來的並行處理,這可能是一個非常愚蠢的問題)。 本質上,我希望能夠在並行函數內啓動和終止循環,並且能夠不時檢查其狀態。python並行循環:與外部通信

我試着編寫一個非常基本的例子(見下文),使用多進程Process和Queue,其中子函數啓動並開始循環。在每次迭代時,它會向隊列中添加一個值,以便將其傳遞到外部。但是,我從隊列中獲得的值是由於我詢問queue.get()的次數而產生的。

運行代碼,問題可能會更有意義。 另類和簡單的做法非常讚賞。

import time 
from multiprocessing import Process, Queue 

def f(q): 
    z = 0 
    while True: 
     z = z+1 
     print 'z', z 
     q.put(['f is at z value {}'.format(z)]) 
     time.sleep(float(0.1))   

if __name__ == '__main__': 
    queue = Queue() 
    process = Process(target=f, args=(queue,)) 
    process.start() 
    print 'start with z value:' 
    print queue.get() 
    time.sleep(1) 

    print 'now f is at z value:' 
    print queue.get() 

    time.sleep(1)   
    print 'terminating with z value:' 
    print queue.get()  
    process.terminate() 

回答

1

您可以使用ArrayValuemultiprocessingshare the state之間的過程:

import time 
from multiprocessing import Process, Value 

def f(v): 
    z = 0 
    while True: 
     z = z+1 
     print 'z', z 
     v.value = z 
     time.sleep(float(0.1))   

if __name__ == '__main__': 
    value = Value('i') 
    value.value = -1 
    process = Process(target=f, args=(value,)) 
    process.start() 
    print 'start with z value:' 
    print value.value 
    time.sleep(1) 

    print 'now f is at z value:' 
    print value.value 

    time.sleep(1)   
    print 'terminating with z value:' 
    print value.value 
    process.terminate() 

輸出:

start with z value: 
-1 
z 1 
z 2 
z 3 
z 4 
z 5 
z 6 
z 7 
z 8 
z 9 
z 10 
now f is at z value: 
10 
z 11 
z 12 
z 13 
z 14 
z 15 
z 16 
z 17 
z 18 
z 19 
z 20 
terminating with z value: 
20 
0

類將是一個更好的結構來處理狀態請求,而不是一個函數。試試這個:

class MyProcess(): 
    def __init__(self): 
    self.state = None 

    def start_process(self): 
    #manage your state handling here or in an other method. 
    self.state = "set state" 

p = MyProcess() 
queue = Queue() 
process = Process(target=p.start_process, args=(queue,)) 
process.start() 
print(p.state) #get the state 
1

這聽起來像你想shared Value

import time 
from multiprocessing import Process, Value 

def f(z): 
    while True: 
     z.value += 1 
     print 'z', z.value 
     time.sleep(float(0.1))   

if __name__ == '__main__': 
    z = Value('i', 0) 

    process = Process(target=f, args=(z,)) 
    process.start() 
    print 'start with z value:', z.value 
    time.sleep(1) 

    print 'now f is at z value:', z.value 

    time.sleep(1)   
    print 'terminating with z value:', z.value 

    process.terminate() 

PS,因爲你每次添加一些隊列一輪循環您最初的例子不工作...

1

queue guide解釋的,Queue.get()

卸下並從隊列中返回一個項目。

這意味着,它順序地訪問的對象:它消除了第一個,那麼第二等...此行爲是正常的,因爲一個隊列工作FIFO(先入先出)。

因此,第一次檢查隊列內容Queue.get()將刪除隊列中的第一個元素。此時隊列中只有一個元素。

第二次檢查時,Queue.get()刪除隊列中的第二個元素。此時隊列中有9個元素(第一個已被刪除);但你只訪問第二個。

你第三次檢查隊列,刪除第三個元素,但在隊列中有18個元素(你已經刪除的第一個和第二個元素)

在行動中看到這種行爲,添加以下檢查隊列前行:

print 'Queue size: {}'.format(queue.qsize()) 
print queue.get() 

的問題是,如果你想檢查進程的當前狀態,隊列不這樣做的方式,因爲它隊列消息,它做不是報告當前狀態爲