2017-03-01 150 views
0

我已經使用了很多這個功能,並且找不到任何答案 - 因此我在問。併發線程正在等待任務

現在已經過去了一天,但我無法瞭解一些線程概念,這可能是爲什麼我的代碼很混亂。

我生成3個主題。精細。

當線程2產生時,線程1「停止」,我假設它意味着它死亡。線程2和3相同。

我將這些線程放入活動池中。

我在努力的是保持所有3個線程同時運行並等待。我想有一個隨機的時間間隔爲任務分配線程的方法。

從我收集的,我的線程正在死亡的原因是因爲我的工人類正在返回。然而,玩過它並把它放在一個循環中(while 1),我仍然無法獲得任何工作。

任何想法?

import logging 
import random 
import threading 
import time 

logging.basicConfig(level = logging.DEBUG, format = '(%(threadName)-2s) %(message)s') 

class ActivePool(object): 
    def __init__(self): 
     super(ActivePool, self).__init__() 

     self.active = [] 
     self.lock = threading.Lock() 

    def activate(self, name): 
     with self.lock: 
      self.active.append(name) 
      logging.debug('Running wheel: %s', self.active) 
      self.move(name) 

    def move(self, name): 
     while name.is_alive(): 
      logging.debug('yes') 

    def inactive(self, name): 
     with self.lock: 
      self.active.remove(name) 
      logging.debug('Running wheel: %s', self.active) 

    def rtime(self): 
     self.rt = random.randint(5, 10) 
     t = threading.Timer(rt, self.revent) 

    def join(self): 
     for t in self.active: 
      t.join() 

    def check(self): 
     for t in self.active: 
      if t.is_alive(): 
       print t 

def worker(s, pool): 
    logging.debug('Wheel inactive') 

    with s: 
     #name = threading.currentThread().getName() 
     thread = threading.currentThread() 
     logging.debug('ACTIVATING') 
     pool.activate(thread) 
     #time.sleep(2) 
     #pool.inactive(thread) 

if __name__ == "__main__": 
    pool = ActivePool() 
    s = threading.Semaphore() 

    for i in range(0, 6): 
     t = threading.Thread(target = worker, name = str(i + 1), args = (s, pool)) 
     pool.activate(t) 
     t.start() 

    logging.debug('here') 
+0

即使在這裏添加一些僞代碼也會有所幫助。否則會非常難以診斷。 –

+0

@phyllisdiller新增 – popopret

+0

當我清理一些東西時,我會讓線程1旋轉起來而不會死亡。 但我不確定這個意圖。你希望基本上有一個等待分配任務的線程池,對嗎? 但是,您的主函數以及您的工作函數(線程要運行的東西)會激活線程。您不應該從線程內激活線程。 –

回答

0

好吧。我已經改變了一些東西。基本上你想要的是這個命令的順序:

  • 構建一個ActivePool。
  • 將線程添加到您的ActivePool。
  • 調用ActivePool.start()開始線程。
  • 工作線程運行輔助函數,共享數據由信號量保護。
  • 主線程等待所有線程完成。

您不需要加入線程。

如果您確實添加了一個隨機任務,您可以將其添加到某個列表(您必須鎖定信號量),該列表將由工作人員函數從中抽取並執行。如果工作人員在列表中看到某些東西,它會將其從列表中拉出並執行相關操作。如果沒有什麼可做的,讓線程睡覺。

您可能希望在您啓動線程池之前將所有線程添加到線程池(即在ActivePool中創建一個列表,然後執行pool.activate()並依次激活每個線程)。

import logging 
import random 
import threading 
import time 

logger = logging.getLogger("thread_logger") 
logger.setLevel(logging.DEBUG) 

class ActivePool(object): 
    def __init__(self): 
     super(ActivePool, self).__init__() 

     self.active = [] 
     self.lock = threading.Lock() 

    def activate(self, name): 
     with self.lock: 
      self.active.append(name) 
      logger.debug('Running wheel: %s', self.active) 
      t.start() 

    def inactive(self, name): 
     with self.lock: 
      self.active.remove(name) 
      logger.debug('Running wheel: %s', self.active) 

    def rtime(self): 
     self.rt = random.randint(5, 10) 
     t = threading.Timer(rt, self.revent) 

    def join(self): 
     for t in self.active: 
      t.join() 

    def check(self): 
     for t in self.active: 
      if t.is_alive(): 
       return True 

def worker(s, pool): 

    logger.debug('Worker spinning up') 

    for x in range(0, 3): 
     with s: 
      logger.debug('Thread ID: ' + str(threading.currentThread().ident) + ' DO WORK: ' + str(x)) 
     time.sleep(2) 

if __name__ == "__main__": 

    pool = ActivePool() 
    s = threading.Semaphore() 

    for i in range(0, 2): 
     t = threading.Thread(target = worker, name = str(i + 1), args = (s, pool)) 
     pool.activate(t) 

    while(pool.check()): 
     print("Worker thread still workin yo.") 
     time.sleep(2) 
    logger.debug('Finito.')