2

我有大量的文件要處理。我寫了一個腳本來獲取,分類和繪製我想要的數據。到現在爲止還挺好。我已經測試過它,並提供了預期的結果。matplotlib和python多線程/多處理文件處理

然後我想用多線程來做到這一點。我查看了互聯網上的文檔和示例,並在我的程序中使用一個線程正常工作。但是當我使用更多的時候,在某些時候,我得到了隨機的matplotlib錯誤,並且我懷疑存在一些衝突,儘管我使用了一個帶有劇情名稱的函數,並且iI無法看到問題出在哪裏。

這是整個腳本,如果您需要更多評論,我會添加它們。謝謝。

#!/usr/bin/python 
import matplotlib 
matplotlib.use('GTKAgg') 
import numpy as np 
from scipy.interpolate import griddata 

import matplotlib.pyplot as plt 
import matplotlib.colors as mcl 
from matplotlib import rC#for latex 

import time as tm 
import sys 
import threading 
import Queue #queue in 3.2 and Queue in 2.7 ! 

import pdb #the debugger 

rc('text', usetex=True)#for latex 

map=0 #initialize the map index. It will be use to index the array like  this: array[map,[x,y]] 
time=np.zeros(1) #an array to store the time 
middle_h=np.zeros((0,3)) #x phi c 

#for the middle of the box 
current_file=open("single_void_cyl_periodic_phi_c_middle_h_out",'r') 
for line in current_file: 
    if line.startswith('# === time'): 
     map+=1 
     np.append(time,[float(line.strip('# === time '))]) 
    elif line.startswith('#'): 
     pass 
    else: 
     v=np.fromstring(line,dtype=float,sep=' ') 
     middle_h=np.vstack((middle_h,v[[1,3,4]])) 
current_file.close() 
middle_h=middle_h.reshape((map,-1,3)) #3d array: map, x, phi,c 

##### 
def load_and_plot(): #will load a map file, and plot it along with the  corresponding profile loaded before 
    while not exit_flag: 
     print("fecthing work ...") 
     #try: 
     if not tasks_queue.empty(): 
      map_index=tasks_queue.get() 
      print("----> working on map: %s" %map_index) 
      x,y,zp=np.loadtxt("single_void_cyl_growth_periodic_post_map_"+str(map_index),unpack=True, usecols=[1, 2,3]) 
      for i,el in enumerate(zp): 
       if el<0.: 
        zp[i]=0. 
      xv=np.unique(x) 
      yv=np.unique(y) 
      X,Y= np.meshgrid(xv,yv) 
      Z = griddata((x, y), zp, (X, Y),method='nearest') 

      figure=plt.figure(num=map_index,figsize=(14, 8)) 
      ax1=plt.subplot2grid((2,2),(0,0)) 
      ax1.plot(middle_h[map_index,:,0],middle_h[map_index,:,1],'*b') 
      ax1.grid(True) 
      ax1.axis([-15, 15, 0, 1]) 
      ax1.set_title('Profiles') 
      ax1.set_ylabel(r'$\phi$') 
      ax1.set_xlabel('x') 

      ax2=plt.subplot2grid((2,2),(1,0)) 
      ax2.plot(middle_h[map_index,:,0],middle_h[map_index,:,2],'*r') 
      ax2.grid(True) 
      ax2.axis([-15, 15, 0, 1]) 
      ax2.set_ylabel('c') 
      ax2.set_xlabel('x') 

      ax3=plt.subplot2grid((2,2),(0,1),rowspan=2,aspect='equal') 
      sub_contour=ax3.contourf(X,Y,Z,np.linspace(0,1,11),vmin=0.) 
      figure.colorbar(sub_contour,ax=ax3) 
      figure.savefig('single_void_cyl_'+str(map_index)+'.png') 
      plt.close(map_index) 
      tasks_queue.task_done() 
     else: 
      print("nothing left to do, other threads finishing,sleeping 2 seconds...") 
      tm.sleep(2) 
    # except: 
    #  print("failed this time: %s" %map_index+". Sleeping 2 seconds") 
    #  tm.sleep(2) 
##### 
exit_flag=0 
nb_threads=2 
tasks_queue=Queue.Queue() 
threads_list=[] 

jobs=list(range(map)) #each job is composed of a map 
print("inserting jobs in the queue...") 
for job in jobs: 
    tasks_queue.put(job) 
print("done") 

#launch the threads 
for i in range(nb_threads): 
    working_bee=threading.Thread(target=load_and_plot) 
    working_bee.daemon=True 
    print("starting thread "+str(i)+' ...') 
    threads_list.append(working_bee) 
working_bee.start() 


#wait for all tasks to be treated 
tasks_queue.join() 

#flip the flag, so the threads know it's time to stop 
exit_flag=1 

for t in threads_list: 
    print("waiting for threads %s to stop..."%t) 
    t.join() 

print("all threads stopped") 
+1

我會推薦使用'multiprocessing'而不是線程。我成功地使用它來實現平行的數字繪圖。 –

+0

謝謝,實現起來似乎比較複雜,但我會試一試。 – Napseis

+0

你只開始最後一個線程;在循環 – jfs

回答

3

繼大衛的建議下,我做了它在多。 8個處理器的速度提升了5倍。我相信剩下的事情在我的腳本開始時對單一流程的工作是做的。 編輯:但是有時腳本「掛起」在最後的地圖,即使它產生正確的地圖,並顯示以下錯誤:

文件「single_void_cyl_plot_mprocess.py」,行90,在tasks_queue.join()

文件 「/usr/local/epd-7.0-2-rh5-x86_64/lib/python2.7/multiprocessing/queues.py」,線路316,在加入self._cond.wait()

文件「 /usr/local/epd-7.0-2-rh5-x86_64/lib/python2.7/multiprocessing/synchronize.py「,第220行,等待self._wait_semaphore.acquire(True,timeout)

import numpy as np 
from scipy.interpolate import griddata 

import matplotlib.pyplot as plt 
from matplotlib import rC#for latex 

from multiprocessing import Process, JoinableQueue 

import pdb #the debugger 

rc('text', usetex=True)#for latex 

map=0 #initialize the map index. It will be use to index the array  like this: array[map,x,y,...] 
time=np.zeros(1) #an array to store the time 
middle_h=np.zeros((0,3)) #x phi c 

#for the middle of the box 
current_file=open("single_void_cyl_periodic_phi_c_middle_h_out",'r') 
for line in current_file.readlines(): 
    if line.startswith('# === time'): 
     map+=1 
     np.append(time,[float(line.strip('# === time '))]) 
    elif line.startswith('#'): 
     pass 
    else: 
     v=np.fromstring(line,dtype=float,sep=' ') 
     middle_h=np.vstack((middle_h,v[[1,3,4]])) 
current_file.close() 
middle_h=middle_h.reshape((map,-1,3)) #3d array: map, x, phi,c 

####### 
def load_and_plot(): #will load a map file, and plot it along with  the corresponding profile loaded before 
    while tasks_queue.empty()==False: 
     print("fecthing work ...") 
     try: 
      map_index=tasks_queue.get() #get some work to do from  the queue 
      print("----> working on map: %s" %map_index) 
       x,y,zp=np.loadtxt("single_void_cyl_growth_periodic_post_map_"+str(map_index),\ 
       unpack=True, usecols=[1, 2,3]) 
      for i,el in enumerate(zp): 
       if el<0.: 
        zp[i]=0. 
      xv=np.unique(x) 
      yv=np.unique(y) 
      X,Y= np.meshgrid(xv,yv) 
      Z = griddata((x, y), zp, (X, Y),method='nearest') 

      figure=plt.figure(num=map_index,figsize=(14, 8)) 
      ax1=plt.subplot2grid((2,2),(0,0)) 
       ax1.plot(middle_h[map_index,:,0],middle_h[map_index,:,1],'*b') 
      ax1.grid(True) 
      ax1.axis([-15, 15, 0, 1]) 
      ax1.set_title('Profiles') 
      ax1.set_ylabel(r'$\phi$') 
      ax1.set_xlabel('x') 

      ax2=plt.subplot2grid((2,2),(1,0)) 
       ax2.plot(middle_h[map_index,:,0],middle_h[map_index,:,2],'*r') 
      ax2.grid(True) 
      ax2.axis([-15, 15, 0, 1]) 
      ax2.set_ylabel('c') 
      ax2.set_xlabel('x') 

      ax3=plt.subplot2grid((2,2), (0,1),rowspan=2,aspect='equal') 
       sub_contour=ax3.contourf(X,Y,Z,np.linspace(0,1,11),vmin=0.) 
      figure.colorbar(sub_contour,ax=ax3) 
       figure.savefig('single_void_cyl_'+str(map_index)+'.png') 
      plt.close(map_index) 
      tasks_queue.task_done() #work for this item finished 
     except: 
      print("failed this time: %s" %map_index) 
####### 

nb_proc=8 #number of processes 
tasks_queue=JoinableQueue() #a queue to pile up the work to do 

jobs=list(range(map)) #each job is composed of a map 
print("inserting jobs in the queue...") 
for job in jobs: 
    tasks_queue.put(job) 
print("done") 

#launch the processes 
for i in range(nb_proc): 
    current_process=Process(target=load_and_plot) 
    current_process.start() 

#wait for all tasks to be treated 
tasks_queue.join() 
+0

你也可以嘗試'pool = multiprocessing.Pool()''for result in pool.imap_unordered(process_job,jobs):pass' – jfs

+0

I可能會嘗試,似乎在加入某個地方有一個錯誤...它不會每次都發生,但它使我困惑。 – Napseis