2017-03-15 24 views
1

我發起的併發線程做一些東西:如何實現動態數量的併發線程?

concurrent = 10 
q = Queue(concurrent * 2) 
for j in range(concurrent): 
    t = threading.Thread(target=doWork) 
    t.daemon = True 
    t.start() 
try: 
    # process each line and assign it to an available thread 
    for line in call_file: 
     q.put(line) 
    q.join() 
except KeyboardInterrupt: 
    sys.exit(1) 

同時我有一個不同的線程計數時間:

def printit(): 
    threading.Timer(1.0, printit).start() 
    print current_status 

printit() 

我想增加(或減少)的併發量主流程的線程讓我們說每分鐘。我可以在時間線程中創建一個時間計數器,並讓它每分鐘執行一些操作,但是如何更改主進程中併發線程的數量?

是否有可能(如果是的話)如何做到這一點?

回答

1

這是我的工作人員:

def UpdateProcesses(start,processnumber,CachesThatRequireCalculating,CachesThatAreBeingCalculated,CacheDict,CacheLock,IdleLock,FileDictionary,MetaDataDict,CacheIndexDict): 
NewPool() 
while start[processnumber]: 
    IdleLock.wait() 
    while len(CachesThatRequireCalculating)>0 and start[processnumber] == True: 
     CacheLock.acquire() 
     try: 
      cacheCode = CachesThatRequireCalculating[0] # The list can be empty if an other process takes the last item during the CacheLock 
      CachesThatRequireCalculating.remove(cacheCode) 
      print cacheCode,"starts processing by",processnumber,"process" 
     except: 
      CacheLock.release() 
     else: 
      CacheLock.release() 
      CachesThatAreBeingCalculated.append(cacheCode[:3]) 
      Array,b,f = TIPP.LoadArray(FileDictionary[cacheCode[:2]])#opens the dask array 
      Array = ((Array[:,:,CacheIndexDict[cacheCode[:2]][cacheCode[2]]:CacheIndexDict[cacheCode[:2]][cacheCode[2]+1]].compute()/2.**(MetaDataDict[cacheCode[:2]]["Bit Depth"])*255.).astype(np.uint16)).transpose([1,0,2]) #slices and calculates the array 
      f.close() #close the file 
      if CachesThatAreBeingCalculated.count(cacheCode[:3]) != 0: #if not, this cache is not needed annymore (the cacheCode is removed bij wavelengthchange) 
       CachesThatAreBeingCalculated.remove(cacheCode[:3]) 
       try: #If the first time the object if not aivalable try a second time 
        CacheDict[cacheCode[:3]] = Array 
       except: 
        CacheDict[cacheCode[:3]] = Array 
       print cacheCode,"done processing by",processnumber,"process" 
    if start[processnumber]: 
     IdleLock.clear() 

這是我如何開始他們:

self.ProcessLst = [] #list with all the processes who calculate the caches 
    for processnumber in range(min(NumberOfMaxProcess,self.processes)): 
     self.ProcessTerminateLst.append(True) 
    for processnumber in range(min(NumberOfMaxProcess,self.processes)): 
     self.ProcessLst.append(process.Process(target=Proc.UpdateProcesses,args=(self.ProcessTerminateLst,processnumber,self.CachesThatRequireCalculating,self.CachesThatAreBeingCalculated,self.CacheDict,self.CacheLock,self.IdleLock,self.FileDictionary,self.MetaDataDict,self.CacheIndexDict,))) 
     self.ProcessLst[-1].daemon = True 
     self.ProcessLst[-1].start() 

我關閉它們是這樣的:

for i in range(len(self.ProcessLst)): #For both while loops in the processes self.ProcessTerminateLst[i] must be True. So or the process is now ready to be terminad or is still in idle mode. 
     self.ProcessTerminateLst[i] = False 

    self.IdleLock.set() #Makes sure no process is in Idle and all are ready to be terminated 
1

我會使用一個池。一個池有最大數量的線程它在同一時間使用,但你可以申請inf數量的作業。他們留在等待列表中,直到線程可用。我不認爲你可以改變池中當前進程的數量。

+0

我可以在我的作業使用睡眠並減少或增加該值以便擁有更多或更少的作業,但是我想要的是更改併發線程的數量(因爲每個線程都是ac oncurrent連接,我正在寫一個負載模擬器的服務器,我需要模擬熱身的緩存),所以這些技巧將無法在我的情況下工作 –

+0

我有一個類似的情況下,我用管理器來管理之間進程和我的線程中有while循環。所以我只是啓動了x個線程,如果我想關閉它們,我在while循環中將該變量設置爲True,並且線程退出。該變量位於管理器列表中,因此您可以在主線程中對其進行修改。我可以告訴你我的代碼給你一個想法,我是如何解決它的,但我是這個論壇的新手,不知道如何。 –

+0

嗯...看起來像只能減少,不會增加... 和我使用全局變量,因此我可以在線程之間共享狀態^^ –