2012-01-25 166 views
2

我正在使用multiprocessing.py過濾大文本文件。代碼基本上打開文本文件,在它上面工作,然後關閉它。連續多處理

事情是,我希望能夠在多個文本文件上連續啓動它。因此,我試圖添加一個循環,但由於某種原因,它不起作用(而代碼在每個文件上工作)。我相信這是一個問題:

if __name__ == '__main__':  

但是,我正在尋找別的東西。我試圖創建一個啓動和LauncherCount文件是這樣的:

LauncherCount.py: 

    def setLauncherCount(n): 
     global LauncherCount 
     LauncherCount = n 

,並

Launcher.py: 

import os 
import LauncherCount 

LauncherCount.setLauncherCount(0) 

os.system("OrientedFilterNoLoop.py") 

LauncherCount.setLauncherCount(1) 

os.system("OrientedFilterNoLoop.py") 

... 

我進口LauncherCount.py,並使用LauncherCount.LauncherCount作爲我的循環索引。

當然,這不起作用,因爲它在本地編輯變量LauncherCount.LauncherCount,所以它不會在導入的LauncherCount版本中編輯。

是否有任何方法可以在導入的文件中全局編輯變量?或者,有什麼方法可以以其他方式做到這一點?我需要的是多次運行代碼,更改一個值,並且顯然不使用任何循環。

謝謝!

編輯:這是我的主要代碼,如果有必要。很抱歉的不良作風......

import multiprocessing 
import config 
import time 
import LauncherCount 

class Filter: 

    """ Filtering methods """ 
    def __init__(self): 
     print("launching methods") 

     # Return the list: [Latitude,Longitude] (elements are floating point numbers) 
    def LatLong(self,line): 

     comaCount = [] 
     comaCount.append(line.find(',')) 
     comaCount.append(line.find(',',comaCount[0] + 1)) 
    comaCount.append(line.find(',',comaCount[1] + 1)) 
    Lat = line[comaCount[0] + 1 : comaCount[1]] 
    Long = line[comaCount[1] + 1 : comaCount[2]] 

    try: 
     return [float(Lat) , float(Long)] 
    except ValueError: 
     return [0,0] 

# Return a boolean: 
# - True if the Lat/Long is within the Lat/Long rectangle defined by: 
#   tupleFilter = (minLat,maxLat,minLong,maxLong) 
# - False if not                 
def LatLongFilter(self,LatLongList , tupleFilter) : 
    if tupleFilter[0] <= LatLongList[0] <= tupleFilter[1] and 
     tupleFilter[2] <= LatLongList[1] <= tupleFilter[3]: 
     return True 
    else: 
     return False 

def writeLine(self,key,line): 
    filterDico[key][1].write(line) 



def filteringProcess(dico): 

    myFilter = Filter() 

    while True: 
     try: 
      currentLine = readFile.readline() 
     except ValueError: 
      break 
     if len(currentLine) ==0:     # Breaks at the end of the file 
      break 
     if len(currentLine) < 35:     # Deletes wrong lines (too short) 
      continue 
     LatLongList = myFilter.LatLong(currentLine) 
     for key in dico: 
      if myFilter.LatLongFilter(LatLongList,dico[key][0]): 
       myFilter.writeLine(key,currentLine) 


########################################################################### 
       # Main 
########################################################################### 

# Open read files: 
readFile = open(config.readFileList[LauncherCount.LauncherCount][1], 'r') 

# Generate writing files: 
pathDico = {} 
filterDico = config.filterDico 

# Create outputs 
for key in filterDico: 
    output_Name = config.readFileList[LauncherCount.LauncherCount][0][:-4] 
        + '_' + key +'.log' 
    pathDico[output_Name] = config.writingFolder + output_Name 
    filterDico[key] = [filterDico[key],open(pathDico[output_Name],'w')] 


p = [] 
CPUCount = multiprocessing.cpu_count() 
CPURange = range(CPUCount) 

startingTime = time.localtime() 

if __name__ == '__main__': 
    ### Create and start processes: 
    for i in CPURange: 
     p.append(multiprocessing.Process(target = filteringProcess , 
              args = (filterDico,))) 
     p[i].start() 

    ### Kill processes: 
    while True: 
     if [p[i].is_alive() for i in CPURange] == [False for i in CPURange]: 
      readFile.close() 
      for key in config.filterDico: 
       config.filterDico[key][1].close() 
       print(key,"is Done!") 
       endTime = time.localtime() 
      break 

    print("Process started at:",startingTime) 
    print("And ended at:",endTime) 
+0

」事情是,我希望能夠在多個文本文件上連續啓動它。「這似乎是Queue的用途。你爲什麼不使用隊列呢? –

+0

如果我得到它,隊列用於在進程之間交換值和信息? 我想要做的不是擴展進程,以便它們可以在連續的文件上工作,而是等待進程完成,並在新的輸入文件上使用相同的方法創建一堆新進程。 – kevad

+1

似乎倒退了。爲什麼不有一堆進程在等待文件名的讀隊列呢?一個進程完成後,它將文件名放入下一個進程的隊列中。這樣同步很容易。從隊列中讀取姓名;做工作;將名稱寫入另一個隊列。你爲什麼不這樣做? –

回答

1

要處理的文件組序列上的文件工作組內並行,而:

#!/usr/bin/env python 
from multiprocessing import Pool 

def work_on(args): 
    """Process a single file.""" 
    i, filename = args 
    print("working on %s" % (filename,)) 
    return i 

def files(): 
    """Generate input filenames to work on.""" 
    #NOTE: you could read the file list from a file, get it using glob.glob, etc 
    yield "inputfile1" 
    yield "inputfile2" 

def process_files(pool, filenames): 
    """Process filenames using pool of processes. 

    Wait for results. 
    """ 
    for result in pool.imap_unordered(work_on, enumerate(filenames)): 
     #NOTE: in general the files won't be processed in the original order 
     print(result) 

def main(): 
    p = Pool() 

    # to do "successive" multiprocessing 
    for filenames in [files(), ['other', 'bunch', 'of', 'files']]: 
     process_files(p, filenames) 

if __name__=="__main__": 
    main() 

每個process_file()被稱爲序列中的前一後有已完成,即來自process_files()的不同調用的文件是並行處理的而不是。 「

+0

我沒有爲每個輸入文件使用相同的輸出文件,所以基本上每次文件完成時,我關閉所有輸出文件,創建一堆新文件,並在新輸入和新輸出上啓動一堆新進程..我怎樣才能解決你的問題? – kevad

+1

@ user1154967:根據輸入文件名生成輸出文件名,例如'output_filename = filename +'。output'' – jfs

+0

這也將創建一個並行多處理,而我正在尋找一種方法來執行連續多處理,文件和緩衝存儲器作爲輸入文件大約35GB,並從數據庫服務器讀取這些文件。 – kevad