2017-08-08 138 views
2

`該代碼是使用隊列任務多項工作進程的嘗試。多處理和隊列

我想確定不同數量的過程和處理數據的不同方法之間的速度差異。

但是輸出沒有做我認爲會的。

from multiprocessing import Process, Queue 
import time 
result = [] 

base = 2 

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21] 

# create queue for new tasks 
new_tasks = Queue(maxsize=0) 

# put tasks in queue 
print('Putting tasks in Queue') 
for i in data: 
    new_tasks.put(i) 

# worker function definition 
def f(q, p_num): 
    print('Starting process: {}'.format(p_num)) 
    while not q.empty(): 
     # mimic some process being done 
     time.sleep(0.05) 
     print(q.get(), p_num) 
    print('Finished', p_num) 

print('initiating processes') 
processes = [] 
for i in range(0, 2): 
    if __name__ == '__main__': 
     print('Creating process {}'.format(i)) 
     p = Process(target=f, args=(new_tasks, i)) 
     processes.append(p) 
#record start time 
start = time.time() 

# start process 
for p in processes: 
    p.start() 

# wait for processes to finish processes 
for p in processes: 
    p.join() 

#record end time 
end = time.time() 

# print time result 
print('Time taken: {}'.format(end-start)) 

預計這樣的:

Putting tasks in Queue 
initiating processes 
Creating process 0 
Creating process 1 
Starting process: 1 
Starting process: 0 
1 1 
2 0 
3 1 
4 0 
5 1 
6 0 
7 1 
8 0 
9 1 
10 0 
11 1 
23 0 
45 1 
76 0 
4567 1 
65423 0 
45 1 
4 0 
3 1 
21 0 
Finished 1 
Finished 0 
Time taken: <some-time> 

而是我其實得到這樣的:

Putting tasks in Queue 
initiating processes 
Creating process 0 
Creating process 1 
Time taken: 0.01000523567199707 
Putting tasks in Queue 
Putting tasks in Queue 
initiating processes 
Time taken: 0.0 
Starting process: 1 
initiating processes 
Time taken: 0.0 
Starting process: 0 
1 1 
2 0 
3 1 
4 0 
5 1 
6 0 
7 1 
8 0 
9 1 
10 0 
11 1 
23 0 
45 1 
76 0 
4567 1 
65423 0 
45 1 
4 0 
3 1 
21 0 
Finished 0 

似乎有是兩個主要的問題,我不知道如何相關它們是:

  1. 打印語句,如: Putting tasks in Queue initiating processes Time taken: 0.0 被系統重複進行,雖然代碼 - 我說的系統becasue他們究竟每一次重複。

  2. 第二過程永遠無法完成,它永遠不會識別隊列是空的,並因此不能退出

+1

我聽起來像喲你有代碼格式化問題:你應該只有一個'採取的時間:...'打印輸出。 – quamrana

+1

另外,你絕不應該輪詢'q。empty()',因爲貪婪的線程可能會竊取最後一個項目,並讓所有其他線程等待永遠不會出現的項目。你應該使用的是隊列標記的結尾。每個線程一個。 – quamrana

+0

否則這是一個很好的問題。你已經在編寫代碼和收集輸出*和*時展示了你期望發生的事情。 – quamrana

回答

2

1)我不能重現此。

2)看下面的代碼:

while not q.empty(): 
    time.sleep(0.05) 
    print(q.get(), p_num) 

每一行可以由任何PROCES任何順序運行。現在考慮q具有單個項目和兩個過程AB。現在考慮執行的順序如下:

# A runs 
while not q.empty(): 
    time.sleep(0.05) 

# B runs 
while not q.empty(): 
    time.sleep(0.05) 

# A runs 
print(q.get(), p_num) # Removes and prints the last element of q 

# B runs 
print(q.get(), p_num) # q is now empty so q.get() blocks forever 

交換的time.sleepq.get的順序刪除我的所有運行的阻塞,但它仍然可能有不止一個進程進入與離開單個項目的環。

解決這個問題是使用非阻塞get通話和捕捉queue.Empty例外方式:

import queue 

while True: 
    time.sleep(0.05) 
    try: 
     print(q.get(False), p_num) 
    except queue.Empty: 
     break 
+0

很好的答案,很好的解釋,對我的另一半問題有什麼想法? –

+0

不,看着你的代碼,我沒有看到任何方式可以打印多行。也許嘗試應對這個問題的代碼,看看你選擇使用if,then,break,而不是try,除了''以外的代碼是否與你正在運行的代碼 – ikkuh

1

你的工作線程應該是這樣的:

def f(q, p_num): 
    print('Starting process: {}'.format(p_num)) 
    while True: 
     value = q.get() 
     if value is None: 
      break 
     # mimic some process being done 
     time.sleep(0.05) 
     print(value, p_num) 
    print('Finished', p_num) 

和隊列應填充真實數據後的標記:

for i in data: 
    new_tasks.put(i) 
for _ in range(num_of_threads): 
    new_tasks.put(None) 
+0

有什麼不同。這僅僅是一個速度問題? –

+0

另外,我的隊列應該填充什麼'標記'?在關於隊列或多處理或多線程的文檔中,我找不到它們的提及。 (一個鏈接很可愛,我不希望你在評論中寫一篇文章) –

+0

@quarana當然,這些項目需要在隊列中才能正確過去,而不會共享或「鎖定」? –