2017-03-27 18 views
1

我正在嘗試編寫一個程序,將路由器上的輸出的一些數據與我們在數據庫中的數據進行比較。爲了加速它(有相當數量的設備),我希望它有多線程。如果我只開始2個線程,我會認爲我應該有3個線程。主線程和兩名工作人員。但由於某種原因,我最終有5個線程,然後下降到3個線程,但從不低於3(我假設兩個線程的損失是來自兩個工作線程完成)。我從來沒有回到1線程,所以我無法停止睡覺。我在代碼中丟失了一些明顯的東西嗎?當我只開始2時,我如何以5個線程結束?

主程序:

from orionsdk import SwisClient 
from getpass import getpass 
import xlsxwriter 
import signal 
import threading 
import queue 
import sys 
import time 
from colorama import Fore, Style, init 
from device_solarwinds_comparison import compare 

init() # init colorama 
results = [] 

class SignalHandler: 
    """ 
    The object that will handle signals and stop the worker threads. 
    """ 

    #: The stop event that's shared by this handler and threads. 
    stopper = None 

    #: The pool of worker threads 
    workers = None 

    def __init__(self, stopper, workers): 
     self.stopper = stopper 
     self.workers = workers 

    def __call__(self, signum, frame): 
     # This will be called by the python signal module 

     print("Telling threads to exit.") 
     self.stopper.set() 

     print("Waiting for threads to finish up.") 
     for worker in self.workers: 
      worker.join() 

     sys.exit(0) 

class AsyncComparison(threading.Thread): 
    # The queue of work 
    work_queue = None 

    # The event that tells the thread to stop 
    stopper = None 

    def __init__(self, work_queue, stopper): 
     super().__init__() 
     self.device_queue = work_queue 
     self.id = threading.Thread.getName(self) 
     self.lock = threading.Lock() 
     self.stopper = stopper 

    def run(self): 
     while not self.stopper.is_set(): 
      try: 
       swis_result = self.device_queue.get_nowait() 
      except queue.Empty: 
       break 
      else: 
       print(Style.BRIGHT + Fore.BLUE + "[{0}]".format(self.id) + Style.RESET_ALL + " " + Fore.GREEN + swis_result['Caption'] + Style.RESET_ALL) 
       result = compare(swis_result) 
       with self.lock: 
        results.append(result) 
       self.device_queue.task_done() 

def main(): 
    # Set number of worker threads 
    num_workers = 2 
    # Create stopper event 
    stopper = threading.Event() 
    # Create a work queue 
    work_queue = queue.Queue() 
    sw_username = 'user' 
    sw_password = 'pass' 

    swis = SwisClient(hostname='host', username=sw_username, password=sw_password) 
    swis_results = swis.query("""SELECT TOP 2 N.Caption, N.CustomProperties.Site_Tier, N.CustomProperties.Bandwidth, 
         N.CustomProperties.Carrier, N.CustomProperties.CircuitID, N.CustomProperties.GOLDCAR, 
         N.CustomProperties.Tier1Broadband, N.CustomProperties.Tier1BroadbandSpeed, N.CustomProperties.Tier1BroadbandProvider 
         FROM Orion.Nodes N 
         WHERE N.Caption LIKE '%rt-0%'""") 
    swis_results = swis_results['results'] 

    for result in swis_results: 
     work_queue.put(result) 

    # Create workers 
    workers = [AsyncComparison(work_queue, stopper) for i in range(num_workers)] 

    # Create our signal handler and pass it the stopper event and the workers 
    handler = SignalHandler(stopper, workers) 
    # Attach the SIGINT signal to our handler 
    signal.signal(signal.SIGINT, handler) 

    # Start the worker threads 
    for i, worker in enumerate(workers): 
     print(i) 
     worker.start() 

    # While threads are active we will sleep. Slight performance hit compared to blocking with .join 
    # but this will enable the main thread to catch interrupts so we can exit with ctrl+c 
    while threading.active_count() > 1: 
     print("{0}".format(threading.active_count())) 
     time.sleep(1) 

    # Create a workbook and add a worksheet. 
    workbook = xlsxwriter.Workbook('Circuit_Comparison.xlsx') 
    worksheet = workbook.add_worksheet() 
    # Add formats 
    bold = workbook.add_format({'bold': True}) 
    red_bg = workbook.add_format({'bg_color': 'red'}) 
    # Write some data headers. 
    worksheet.write('A1', 'Device', bold) 
    worksheet.write('B1', 'Device_CircuitID', bold) 
    worksheet.write('C1', 'Device_Bandwidth', bold) 
    worksheet.write('D1', 'Device_ServicePolicy', bold) 
    worksheet.write('E1', 'Device_GOLDCAR', bold) 
    worksheet.write('F1', 'Device_Carrier', bold) 
    worksheet.write('G1', 'Device_Tier1_BB_Provider', bold) 
    worksheet.write('H1', 'Device_Tier1_BB_Speed', bold) 
    worksheet.write('I1', 'Solarwinds_CircuitID', bold) 
    worksheet.write('J1', 'Solarwinds_Bandwidth', bold) 
    worksheet.write('K1', 'Solarwinds_GOLDCAR', bold) 
    worksheet.write('L1', 'Solarwinds_Carrier', bold) 
    worksheet.write('M1', 'Solarwinds_Tier1_BB_Provider', bold) 
    worksheet.write('N1', 'Solarwinds_Tier1_BB_Speed', bold) 
    # Start from the first cell below the headers. 
    row = 1 
    col = 0 
    print("Results: {0}".format(results)) 
    for item in results: 
     print(item) 
     # Convert the date string into a datetime object. 
     worksheet.write_string(row, col, item[0]) 

     if item[2] == item[3] or not item[3]: # If bandwidth equal to service_policy, or there's no service_policy 
      worksheet.write_string(row, col + 2, item[2]) 
      worksheet.write_string(row, col + 3, item[3]) 
     else: 
      worksheet.write_string(row, col + 2, item[2], red_bg) 
      worksheet.write_string(row, col + 3, item[3], red_bg) 

     # Convert SW bandwidth to lower case, split on x, take the first number, convert to float (because of decimals) 
     processed_sw_bandwidth = float(item[9].lower().split('x')[0]) 
     # If bandwidth is less than 100 it's probably in mbps, convert to kbps 
     if processed_sw_bandwidth < 100: 
      processed_sw_bandwidth *= 1000 
     # If the Solarwinds bandwidth doesn't equal the device bandwidth 
     # (convert float to integer to get rid of decimal then to string for comparison) 
     if str(int(processed_sw_bandwidth)) != item[2]: 
      worksheet.write_string(row, col + 2, item[2], red_bg) 
      worksheet.write_string(row, col + 9, item[9], red_bg) 
     else: 
      worksheet.write_string(row, col + 9, item[9]) 

     # If device goldcar not equal to Solarwinds goldcar 
     if str(item[4]) != str(item[10]): 
      worksheet.write_string(row, col + 4, item[4], red_bg) 
      worksheet.write_string(row, col + 10, str(item[10]), red_bg) 
     elif not str(item[4]) and str(item[10]) == 'None': # No device Goldcar and Solarwinds reports None for Goldcar 
      worksheet.write_string(row, col + 4, item[4]) 
      worksheet.write_string(row, col + 10, str(item[10])) 
     else: 
      worksheet.write_string(row, col + 4, item[4]) 
      worksheet.write_string(row, col + 10, str(item[10])) 

     # If we're missing a circuit ID somewhere 
     if not item[8] and item[1]: 
      worksheet.write_string(row, col + 1, item[1], red_bg) 
      worksheet.write_string(row, col + 8, '', red_bg) 
     elif item[8] and not item[1]: 
      worksheet.write_string(row, col + 1, '', red_bg) 
      worksheet.write_string(row, col + 8, item[8], red_bg) 
     elif not item[8] and not item[1]: 
      worksheet.write_string(row, col + 1, '', red_bg) 
      worksheet.write_string(row, col + 8, '', red_bg) 
     # If Solarwinds circuit ID not in the description 
     elif (item[8].lower() not in item[1].lower()): 
      worksheet.write_string(row, col + 1, item[1], red_bg) 
      worksheet.write_string(row, col + 8, item[8], red_bg) 
     else: 
      worksheet.write_string(row, col + 1, item[1]) 
      worksheet.write_string(row, col + 8, item[8]) 

     # Build Carrier check list 
     if item[11].lower() == 'verizon' or 'vzw': 
      carrier_list = ['verizon', 'vzw'] 
     elif item[11].lower() == 'twt' or 'time warner telecom' or 'level3': 
      carrier_list = ['twt', 'time warner telecom', 'level3'] 

     # If none of the carriers in carrier_list are in device description 
     if all(s not in item[1].lower() for s in carrier_list): 
      worksheet.write_string(row, col + 1, item[1], red_bg) 
      worksheet.write_string(row, col + 11, item[11], red_bg) 
     else: 
      worksheet.write_string(row, col + 1, item[1]) 
      worksheet.write_string(row, col + 11, item[11]) 

     worksheet.write_string(row, col + 5, item[5]) 
     worksheet.write_string(row, col + 6, item[6]) 
     worksheet.write_string(row, col + 7, item[7]) 
     worksheet.write_string(row, col + 12, item[12]) 
     worksheet.write_string(row, col + 13, item[13]) 
     row += 1 

    workbook.close() 

if __name__ == "__main__": 
    main() 

線程函數:

from netmiko import ConnectHandler 
import re 

def compare(swis_result): 
    ios_username = 'user' 
    ios_password = 'pass' 

    bw_re = re.compile(r'bandwidth\s(\d*)') 
    desc_re = re.compile(r'description\s(.*)') 
    gi02_vlan_re = re.compile(r'GigabitEthernet0\/2(\.\d*)') 
    # gi02_down_re = re.compile(r'GigabitEthernet0\/2.*(down)') 
    gi000_re = re.compile(r'GigabitEthernet(0\/0\/0)') 
    goldcar_re = re.compile(r'VOICE-Q\n\s*priority\s(\d*)') 
    servicepolicy_re = re.compile(r'service-policy\soutput\s(\d*)') 
    serial_class_re = re.compile(r'class\s(\d*)') 

    router = { 
     'device_type': 'cisco_ios', 
     'ip': swis_result['Caption'], 
     'username': ios_username, 
     'password': ios_password, 
     'secret': ios_password, 
     'port': 22, 
    } 
    ssh_conn = ConnectHandler(**router) 
    ssh_conn.enable() 

    output = ssh_conn.send_command("sh ip int br") 
    try: 
     gi02_vlan = re.search(gi02_vlan_re, output).group(1) 
    except AttributeError: 
     gi02_vlan = '' 
    try: 
     tier1_bb_check = re.search(gi000_re, output).group(1) 
    except AttributeError: 
     tier1_bb_check = '' 

    while True: 
     if gi02_vlan: 
      output = ssh_conn.send_command("sh run int gi0/2{0}".format(gi02_vlan)) 
      description = re.search(desc_re, output).group(1) 
      bandwidth = re.search(bw_re, output).group(1) 
      service_policy = re.search(servicepolicy_re, output).group(1) 
      break 
     else: 
      output = ssh_conn.send_command("sh run int gi0/2") 
      try: 
       description = re.search(desc_re, output).group(1) 
       bandwidth = re.search(bw_re, output).group(1) 
       try: 
        service_policy = re.search(servicepolicy_re, output).group(1) 
       except AttributeError: 
        service_policy = "" 
        break 
       break 
      except: 
       try: 
        output = ssh_conn.send_command("sh run int Multilink1") 
        description = re.search(desc_re, output).group(1) 
        bandwidth = re.search(bw_re, output).group(1) 
        service_policy = re.search(servicepolicy_re, output).group(1) 
        break 
       except AttributeError: 
        try: 
         output = ssh_conn.send_command("sh run int Serial0/0/0:1") 
         description = re.search(desc_re, output).group(1) 
         bandwidth = re.search(bw_re, output).group(1) 
         output = ssh_conn.send_command("sh run int Serial0/0/0:1.1") 
         service_policy = re.search(serial_class_re, output).group(1) 
         break 
        except AttributeError: 
         description = "?" 
         bandwidth = "?" 
         service_policy = "?" 
         break 

    if '01' in swis_result['Caption']: 
     output = ssh_conn.send_command('sh policy-map WAN-QOS-MAP') 
     goldcar = re.search(goldcar_re, output).group(1) 
    else: 
     goldcar = '' 

    vzb_list = ['vzn', 'verizon'] 
    twt_list = ['time warner cable', 'twtcs'] 
    if any(word in description.lower() for word in vzb_list): 
     carrier = 'VZB' 
    elif any(word in description.lower() for word in twt_list): 
     carrier = 'TWT' 

    spreadsheet_row = [swis_result['Caption'], description, bandwidth, service_policy, goldcar, carrier, '', '', 
         swis_result['CircuitID'], swis_result['Bandwidth'], swis_result['GOLDCAR'], swis_result['Carrier'], '', ''] 
    return(spreadsheet_row) 
+1

如果您要預先填充工作隊列,並且在工作人員正在運行時從不向隊列添加項目,那麼使用隊列而不是常規列表沒有太大意義。另外,你的鎖沒有保護任何東西。 – user2357112

+1

像「workers = None」這樣的班級任務毫無意義。您不需要在類級別「聲明」實例屬性或構造函數參數。這些任務正在創建未使用的不必要的類屬性,即a.a.靜態變量。 – user2357112

+1

你不能保證你使用的庫不是自己創建線程。 '加入'你感興趣的線程,而不是試圖統計整個程序中的所有活動線程。 – user2357112

回答

0

我不得不把一個ssh_conn.disconnect()在工人功能正常關閉SSH連接。這是額外的線程正在創建,從未終止。

相關問題