2012-02-11 22 views
0

我遇到線程未執行的問題。我遵循了關於如何設置線程的文檔,它工作的很好,除了在完成第一批任務後掛起。基本上它會在本地安裝遠程服務器日誌目錄,然後解析日誌中的兩個特定字符串。當前10個線程完成時,它會掛起並永遠不會移動到服務器上。我究竟做錯了什麼?Python 3線程沒有運行大於線程數的任何任務

''' 
Created on Feb 10, 2012 
This script exists solely to check the configs of prod servers for oom exceptions 
and restarts 
''' 
import shlex 
import subprocess 
import time 
import re 
import os 
import logging 
import logging.handlers 
import queue 
import threading 
from threading import Lock 
import getpass 


#search string: 
ss = "outofmemory" 
password = getpass.getpass("Please type in your sea1 password to mount the drives locally: ") 
user = getpass.getuser() 

max_threads = 9 
log_home = os.path.expanduser("~") 
log_path = os.path.join(log_home, "Desktop") 
log_file = 'Server Parser.log' 
log_out = os.path.join(log_path, log_file) 
logger = logging.getLogger("Server Parser") 
tg_logger = logging.getLogger('thread') 
tg_logger.setLevel(logging.DEBUG) 
tp_logger = logging.getLogger('tpwipe') 
tp_logger.setLevel(logging.DEBUG) 

outFile = "Server Parser (with {} threads).log".format(max_threads) 
output_path = os.path.join(log_path, outFile) 
logger.setLevel(logging.INFO) 
# create file handler which logs even debug messages 
fh = logging.handlers.RotatingFileHandler(log_out, mode='a', 
              maxBytes=2000000, 
              backupCount=6) 
fh.setLevel(logging.DEBUG) 
# create console handler with a higher log level 
ch = logging.StreamHandler() 
ch.setLevel(logging.INFO) 
# create formatter and add it to the handlers 
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') 
fh.setFormatter(formatter) 
ch.setFormatter(formatter) 
# add the handlers to the logger 
logger.addHandler(fh) 
logger.addHandler(ch) 
tp_logger.addHandler(fh) 
tp_logger.addHandler(ch) 
tg_logger.addHandler(ch) 
tg_logger.addHandler(fh) 
q = queue.Queue() 
lock = Lock() 
class ThreadShredder(threading.Thread): 
    """threaded information aggregator""" 
    def __init__(self, myqueue, search_string, l, thr, user, password): 
     threading.Thread.__init__(self) 
     self.q = myqueue 
     self.ss = search_string 
     self.logger = logging.getLogger("thread".format(thr)) 
     self.thread_num = thr 
     self.lock = l 
     self.p = password 
     self.u = user 
    def run(self): 
     ''' 
     will mount all the drives and start parsing, like a boss 
     ''' 
     def wait_check(dl): 
      if os.path.exists(dl+":\\"): 
       self.logger.info("Mount exists, waiting til it frees to continue") 
       time.sleep(20) 
       wait_check(dl) 
     self.lock.acquire() 
     qi = self.q.get() 
     drive_letter = qi[1] 
     wait_check(drive_letter) 
     direc = qi[0] 
     host = qi[2] 
     u = self.u 
     p = self.p 
     self.logger.debug("Thread: {} folder: {}".format(self.thread_num, direc)) 
     mount_cmd = r"net use {}: {} {} /USER:sea1\\{}".format(drive_letter, direc, p, u) 
     args = shlex.split(mount_cmd) 
     self.logger.debug("Thread: {} mount args: {}".format(self.thread_num, args)) 
     p = subprocess.Popen(args) 
     self.lock.release() 
     time.sleep(1) 
     logFile = os.path.join("{}:\\".format(drive_letter), "Server-app.log") 
     ex_time = [] 
     last_restart = None 
     with open(logFile, encoding="utf-8", errors="ignore", mode="r") as data: 
      for line in data: 
       if line.rstrip("\n") == "System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.": 
        continue 
       line_tokens = line.split() 
       if len(line_tokens) >= 7: 
        if line_tokens[6] == "Service" and line_tokens[7] == "starting...": 
         last_restart = line_tokens[1] 
       if self.ss in line.lower().rstrip("\n"): 
        if len(line_tokens) >= 6: 
         job = line_tokens[1] 
         if job == "Exception" or "," not in job: 
          continue 
         ex_time.append(job) 
      if len(ex_time) > 0: 
       self.logger.info("OOM Exception detected on {} at {}".format(host, ex_time[-1])) 
       if last_restart != None: 
        self.logger.info("last restart on {} at {}".format(host, last_restart)) 
     time.sleep(10) 
     unmount = "net use {}: /DELETE".format(drive_letter) 
     unmntCom = shlex.split(unmount) 
     np = subprocess.Popen(unmntCom) 
     np 
     self.q.task_done() 

srv_log_dir = ["l$", "logs"] 
srv_log = "Service-app.log" 
srv_log_path = os.path.join(rmp_log_dir[0], rmp_log_dir[1], rmp_log) 
odd_list = ["mycoolserver", "mycoolserver1", "mycoolserver3", "mycoolserver4"] 
win_drive_letters = ["e", "f", "g", "h", "i", "j", "k", "l", "m", 
        "n", "o", "p", "q", "r", "s", "t", "u", "v", 
        "w", "x", "y", "z"] 
win_drive_list = [] 
full_log_path = [] 
job_list = [] 
srv_list = [] 
folder_string = r'"\\\\server\l$\logs"' 
len_rzt = len(odd_list) 

for i in range(max_threads): 
    j = i + 1 
    t = ThreadShredder(q, ss.lower(), lock, j, user, password) 
    t.setDaemon(True) 
    t.start() 

def find_drives(): 
    ''' 
    Creates contiguous block of ten drive letters that aren't being used 
    ''' 
    drive_block = set() 
    for i in range(10): 
     if not os.path.exists(win_drive_letters[i]+":\\") and win_drive_letters[i] not in drive_block: 
      drive_block.add(win_drive_letters[i]) 
      logger.debug("{} {}".format(i, drive_block)) 
     elif not os.path.exists(win_drive_letters[i+1]+":\\") and win_drive_letters[i+1] not in drive_block: 
      drive_block.add(win_drive_letters[i+1]) 
      logger.debug("{} {}".format(i, drive_block)) 
     elif not os.path.exists(win_drive_letters[i+2]+":\\") and win_drive_letters[i+2] not in drive_block: 
      drive_block.add(win_drive_letters[i+2]) 
      logger.debug("{} {}".format(i, drive_block)) 
    return sorted(drive_block) 
db = find_drives() 
for elem in db: 
    win_drive_list.append(elem) 
    logger.debug("available drive letters: {}".format(elem)) 
#12 is the number of copy move servers we have 
for i in range(12 + len_rzt): 
    z = i - 12 
    j = i + 1 
    x = i 
    if i > 9 and i < 20: 
     x = i - 10 
    elif i > 19 and i < 30: 
     x = i - 20 
    elif i > 29 and i < 40: 
     x = i - 30 
    if i < 9: 
     srv_list.append("server0{}".format(j)) 
    else: 
     srv_list.append("server{}".format(j)) 
    if i == 9: 
     logger.debug("{} ".format(srv_list[i], win_drive_list[x])) 
    if i < 12: 
     folder = re.sub('server', srv_list[i], folder_string) 
     job_list.append(([folder, win_drive_list[x], rmp_list[i]])) 
    else: 
     folder = re.sub('server', odd_list[z], folder_string) 
     job_list.append(([folder, win_drive_list[x], odd_list[z]])) 
    logger.info(job_list[i]) 

for i, job in enumerate(job_list): 
    q.put(job) 
q.join() 

回答

2

.run()方法在處理隊列中的單個項目後結束。將其更改爲:

while True: 
    job = q.get() 
    # ... 

代碼中還存在許多其他問題。

+0

我添加了該行,現在它掛起沒有任何線程啓動。 – user352472 2012-02-11 21:45:56

+0

@ user352472:'#...'表示'.run()'方法中的所有語句。 – jfs 2012-02-11 21:49:47

+0

該死的,該工作。非常感謝哦!現在我還有其他問題,但我會盡力解決它們。 – user352472 2012-02-11 21:55:44