2014-10-07 86 views
1

*通過使用pool.map()而不是map_async()進行多處理解決。同時處理多個文本文件

Python 2.7 - 如何讓gevent或multiprocessing使用以下代碼同時處理多個文本文件?

我都粘貼和GEVENT多池版本

從日誌輸出它顯示的文件被同步處理,並使用「lsof的」在Linux上確認只有一個文件是在一次一次被讀取。

這些文件存儲在包含ultra320驅動器陣列的企業級磁盤架上。

我可以一次打開4個文件,只用睡覺的功能,而不是當我嘗試逐行打開文件的過程。 'for line in file'循環阻止了以某種方式打開下一個文件?

from time import sleep 
from multiprocessing import Pool 


def hold_open(log): 
    with open(log) as fh: 
     sleep(60) 

pool = Pool(processes=4) 
pool.map(hold_open, ['file1', 'file2', 'file3', 'file4']) 
pool.join() 

我在做什麼錯,我該如何改變以解決它?

2014-10-07 13:51:51,088 - __main__ - INFO - Found 23 files, duration: 0:00:00.000839 
2014-10-07 13:51:51,088 - __main__ - INFO - Now analysing using 8 threads..... 
2014-10-07 13:51:51,089 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Analysing... 
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Finished analysing 41943107 bytes duration: 0:00:00.381875 
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Analysing... 
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Finished analysing 4017126 bytes duration: 0:00:01.725641 
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Analysing... 
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Finished analysing 4970479 bytes duration: 0:00:01.753434 
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.46.05.txt.gz - Analysing... 
from gevent import monkey; monkey.patch_all() 
import os 
import re 
import gzip 
import gevent 
import logging 
from gevent import pool 
from datetime import datetime 


log_level = logging.INFO 
logger = logging.getLogger(__name__) 
logger.setLevel(log_level) 
ch = logging.StreamHandler() 
ch.setLevel(log_level) 
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') 
ch.setFormatter(formatter) 
logger.addHandler(ch) 


def get_time_range(log): 
    if not os.path.isfile(log): 
     logging.error("\x1b[31m%s - Something went wrong analysing\x1b[0m" % log) 
     return 
    date_regex = re.compile('^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}:\d{3})') 

    def process(lh): 
     start, end = str(), str() 
     logger.info("\x1b[33m%s - Analysing...\x1b[0m" % os.path.basename(log)) 
     for line in lh: 
      date = date_regex.match(line) 
      if date: 
       if not start: 
        start = date.group(1) 
       end = date.group(1) 
     return start, end 
    start_time = datetime.now() 
    size = os.path.getsize(log) 
    if os.path.splitext(log)[1] == '.txt': 
     with open(log, 'r') as lh: 
      start, end = process(lh) 
    elif os.path.splitext(log)[1] == '.gz': 
     with gzip.open(log, 'r') as lh: 
      start, end = process(lh) 
    else: 
     return 
    meta = (log, size, start, end) 
    duration = datetime.now() - start_time 
    logger.info("\x1b[32m%s - Finished analysing %s bytes duration: %s\x1b[0m" % (os.path.basename(log), size, duration)) 


def run(directory, pool_size=8, cur=None): 
    start = datetime.now() 
    worker_pool = gevent.pool.Pool(int(pool_size)) 
    files = list() 
    while True: 
     for log in os.listdir(directory): 
      if 'XSLog' and 'txt' in log: 
       files.append(os.path.join(directory, log)) 
     logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start)) 
     logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size) 
     for log in files: 
      worker_pool.spawn(get_time_range, log) 
     worker_pool.join() 
     duration = datetime.now() - start 
     logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration) 


if __name__ == '__main__': 
    run('/path/to/log/files') 

隨着多:

def run(directory, pool_size=8, cur=None): 
    start = datetime.now() 
    worker_pool = gevent.pool.Pool(int(pool_size)) 
    files = list() 
    pool = Pool(processes=pool_size, maxtasksperchild=2) 
    while True: 
     for log in os.listdir(directory): 
      if 'XSLog' and 'txt' in log: 
       files.append(os.path.join(directory, log)) 
     logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start)) 
     logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size) 
     # pool.map_async(get_time_range, files) 
     pool.map(get_time_range, files) # This fixed it. 
     pool.join() 
     duration = datetime.now() - start 
     logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration) 

回答

1

的好處,你從並行獲得的金額在這裏有限的,因爲你的時間顯著塊都花在從磁盤讀取。磁盤I/O是連續的;無論您擁有多少個進程/ greenlet,只有其中一個進程能夠一次從磁盤讀取。現在,除了從磁盤讀取數據的時間之外,其餘時間都花費在正在讀取的行上進行正則表達式匹配。 gevent不會幫你在這一切。這是一個CPU限制操作,並且gevent不能用於並行化CPU限制操作。 gevent對於使阻塞I/O操作非阻塞非常有用,這使得並行I/O成爲可能,但這裏沒有阻塞I/O。

multiprocessing可以使正則表達式運算並行運行,所以我希望它來執行一點點比gevent版本更好。但是在任何情況下,你可能都不會比順序版本更快(如果有的話),因爲你花費了大量的時間從磁盤讀取文件。

+0

我編輯了原始文章以包含我忽視的一個方面。文件存儲在企業級磁盤架上,因此在大多數情況下,根據正在訪問的文件的位置,一次讀取的多個文件不會被'當我做了一些非常基本的基準測試時,從閱讀單個文件的基準時間開始增加。 – 2014-10-07 18:21:09

+0

仍在燃燒的問題是爲什麼不能一次打開多個文件?使用多處理時,我期望每個進程打開一個文件,然後這些打開的文件應該在'lsof'輸出中可見,而不管數據是如何從磁盤上實際提取的? – 2014-10-07 18:29:40

+0

@LukeB您是否看到過使用'multiprocessing'和'gevent'或者'gevent'的行爲?當我用'multiprocessing.Pool'爲你的代碼運行類似的代碼時,我確實看到'cpu_count()'文件同時打開。 – dano 2014-10-07 18:36:44