2012-05-29 125 views
0

我有一個包含數十萬行的文件,其中每行需要經歷相同的過程(計算協方差)。我要多線程,因爲它需要很長的時間。但是,我所看到的所有示例/教程對於我想要做的事情都相當複雜。如果有人能指出我一個很好的教程,解釋如何一起使用這兩個模塊,那將是很棒的。使用線程和隊列模塊的Python中的多線程

回答

0

每當我有並行處理的東西,我用類似這樣的東西(我只是撕開了這一點,現有的腳本):

#!/usr/bin/env python2 
# This Python file uses the following encoding: utf-8 

import os, sys, time 
from multiprocessing import Queue, Manager, Process, Value, Event, cpu_count 

class ThreadedProcessor(object): 
    def __init__(self, parser, input_file, output_file, threads=cpu_count()): 
    self.parser = parser 

    self.num_processes = threads 
    self.input_file = input_file 
    self.output_file = output_file 

    self.shared_proxy = Manager() 

    self.input_queue = Queue() 
    self.output_queue = Queue() 

    self.input_process = Process(target=self.parse_input) 
    self.output_process = Process(target=self.write_output) 

    self.processes = [Process(target=self.process_row) for i in range(self.num_processes)] 

    self.input_process.start() 
    self.output_process.start() 

    for process in self.processes: 
     process.start() 

    self.input_process.join() 

    for process in self.processes: 
     process.join() 

    self.output_process.join() 

    def parse_input(self): 
    for index, row in enumerate(self.input_file): 
     self.input_queue.put([index, row]) 

    for i in range(self.num_processes): 
     self.input_queue.put('STOP') 

    def process_row(self): 
    for index, row in iter(self.input_queue.get, 'STOP'): 
     self.output_queue.put([index, row[0], self.parser.parse(row[1])]) 

    self.output_queue.put('STOP') 

    def write_output(self): 
    current = 0 
    buffer = {} 

    for works in range(self.num_processes): 
     for index, id, row in iter(self.output_queue.get, 'STOP'): 
     if index != current: 
      buffer[index] = [id] + row 
     else: 
      self.output_file.writerow([id] + row) 
      current += 1 

      while current in buffer: 
      self.output_file.writerow(buffer[current]) 
      del buffer[current] 
      current += 1 

基本上,你有兩個過程管理的讀/寫文件。一個讀取並解析輸入,另一個讀取「完成」隊列並寫入輸出文件。其他進程產生(在這種情況下,數量等於CPU的總處理器內核數),它們都處理來自輸入隊列的元素。