*通過使用pool.map()而不是map_async()進行多處理解決。同時處理多個文本文件
Python 2.7 - 如何讓gevent或multiprocessing使用以下代碼同時處理多個文本文件?
我都粘貼和GEVENT多池版本
從日誌輸出它顯示的文件被同步處理,並使用「lsof的」在Linux上確認只有一個文件是在一次一次被讀取。
這些文件存儲在包含ultra320驅動器陣列的企業級磁盤架上。
我可以一次打開4個文件,只用睡覺的功能,而不是當我嘗試逐行打開文件的過程。 'for line in file'循環阻止了以某種方式打開下一個文件?
from time import sleep
from multiprocessing import Pool
def hold_open(log):
with open(log) as fh:
sleep(60)
pool = Pool(processes=4)
pool.map(hold_open, ['file1', 'file2', 'file3', 'file4'])
pool.join()
我在做什麼錯,我該如何改變以解決它?
2014-10-07 13:51:51,088 - __main__ - INFO - Found 23 files, duration: 0:00:00.000839
2014-10-07 13:51:51,088 - __main__ - INFO - Now analysing using 8 threads.....
2014-10-07 13:51:51,089 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Analysing...
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Finished analysing 41943107 bytes duration: 0:00:00.381875
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Analysing...
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Finished analysing 4017126 bytes duration: 0:00:01.725641
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Analysing...
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Finished analysing 4970479 bytes duration: 0:00:01.753434
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.46.05.txt.gz - Analysing...
from gevent import monkey; monkey.patch_all()
import os
import re
import gzip
import gevent
import logging
from gevent import pool
from datetime import datetime
log_level = logging.INFO
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def get_time_range(log):
if not os.path.isfile(log):
logging.error("\x1b[31m%s - Something went wrong analysing\x1b[0m" % log)
return
date_regex = re.compile('^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}:\d{3})')
def process(lh):
start, end = str(), str()
logger.info("\x1b[33m%s - Analysing...\x1b[0m" % os.path.basename(log))
for line in lh:
date = date_regex.match(line)
if date:
if not start:
start = date.group(1)
end = date.group(1)
return start, end
start_time = datetime.now()
size = os.path.getsize(log)
if os.path.splitext(log)[1] == '.txt':
with open(log, 'r') as lh:
start, end = process(lh)
elif os.path.splitext(log)[1] == '.gz':
with gzip.open(log, 'r') as lh:
start, end = process(lh)
else:
return
meta = (log, size, start, end)
duration = datetime.now() - start_time
logger.info("\x1b[32m%s - Finished analysing %s bytes duration: %s\x1b[0m" % (os.path.basename(log), size, duration))
def run(directory, pool_size=8, cur=None):
start = datetime.now()
worker_pool = gevent.pool.Pool(int(pool_size))
files = list()
while True:
for log in os.listdir(directory):
if 'XSLog' and 'txt' in log:
files.append(os.path.join(directory, log))
logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start))
logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size)
for log in files:
worker_pool.spawn(get_time_range, log)
worker_pool.join()
duration = datetime.now() - start
logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration)
if __name__ == '__main__':
run('/path/to/log/files')
隨着多:
def run(directory, pool_size=8, cur=None):
start = datetime.now()
worker_pool = gevent.pool.Pool(int(pool_size))
files = list()
pool = Pool(processes=pool_size, maxtasksperchild=2)
while True:
for log in os.listdir(directory):
if 'XSLog' and 'txt' in log:
files.append(os.path.join(directory, log))
logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start))
logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size)
# pool.map_async(get_time_range, files)
pool.map(get_time_range, files) # This fixed it.
pool.join()
duration = datetime.now() - start
logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration)
我編輯了原始文章以包含我忽視的一個方面。文件存儲在企業級磁盤架上,因此在大多數情況下,根據正在訪問的文件的位置,一次讀取的多個文件不會被'當我做了一些非常基本的基準測試時,從閱讀單個文件的基準時間開始增加。 – 2014-10-07 18:21:09
仍在燃燒的問題是爲什麼不能一次打開多個文件?使用多處理時,我期望每個進程打開一個文件,然後這些打開的文件應該在'lsof'輸出中可見,而不管數據是如何從磁盤上實際提取的? – 2014-10-07 18:29:40
@LukeB您是否看到過使用'multiprocessing'和'gevent'或者'gevent'的行爲?當我用'multiprocessing.Pool'爲你的代碼運行類似的代碼時,我確實看到'cpu_count()'文件同時打開。 – dano 2014-10-07 18:36:44