你可以自己控制字節指針。在這個例子中,我使用帶有字節指針的dictionary
,並將它存儲在shelf
中。然後我從xml
導出html
文件。
默認情況下,此腳本輸出每輸出1000頁的狀態。它會生成一個圖形圖像(默認情況下,項目根目錄中的'xml-split-performance.jpg',就像本文末尾的圖片一樣),因此您可以確保該過程正常工作。
默認情況下,我使用8位工作人員:我在內存中存儲8頁,然後分發8位工作人員編寫的8頁。不知道這是最好的方法,但對我的50k html
s運行良好。
from multiprocessing.pool import ThreadPool as Pool
import matplotlib.pyplot as plt
import itertools, os, sys, datetime, math
from urlparse import urljoin
import shelve
import collections
from functools import partial
class xml_split():
# Flags
track_line = False
last_line = False
output_file = False
all_pages_extracted = False
# Modes available:
# - Read output folder and calculate missing files: c (default)
# - Start from scratch: s
resume_modes = {
'complete_file_count': 'c',
'start_from_scratch': 's',
}
# Queue of pages in memory, before writting to file
pages_queue = []
def __init__(
self,
# Number of workers
pool_size = 8,
# Read x pages at a time
page_memory = 30,
# Path to the input xml file
input_xml_file_path = "sample.xml",
# File name prefix and extension
output_file_prefix = 'page_',
output_file_extension = '.html',
# Path to the output folder: will be created under the script path
page_files_folder = 'pages',
# Path to the cache folder: will be created under the script path
cache_files_folder = 'cache',
# filename of a graph that shows the average performance
graph_file_path = 'xml-split-performance.jpg',
# update graph each x number of pages extracted
update_graph_each = 1000,
# print status on stdout each x number of pages extracted
show_status_each = 1000,
# group files in folders of x pages
batch_size = 1000,
# tags to track
start_string = '<page>',
end_string = '</page>',
start_doc = '<pages>',
end_doc = '</pages>',
# A little template to output the status
log_template = """
Page: {page}
Time: {exported}
Time since the beginning: {queue}
Reporting each n exports: {show_status_each}
Time since last report: {process}
Average entry export time: {average}
""",
):
self.pool_size = pool_size
self.pool = Pool(pool_size)
self.input_xml_file_path = input_xml_file_path
self.input_file = open(input_xml_file_path)
self.output_file_prefix = output_file_prefix
self.output_file_extension = output_file_extension
self.page_files_folder = page_files_folder
self.cache_files_folder = cache_files_folder
self.page_memory = page_memory
self.show_status_each = show_status_each
self.update_graph_each = update_graph_each
self.batch_size = batch_size
self.graph_file_path = graph_file_path
self.start_string = start_string
self.end_string = end_string
self.end_doc = end_doc
self.start_doc = start_doc
self.log_template = log_template
self.chunk_tail = ''
# Set project path to the current script path
self.project_path = os.getcwd() + os.sep
# Folder to place output files
self.page_files_path = urljoin(self.project_path, self.page_files_folder) + os.sep
# Folder to place cache files
self.cache_files_path = urljoin(self.project_path, self.cache_files_folder) + os.sep
self.create_folder(self.page_files_path)
self.create_folder(self.cache_files_path)
# keep track of time, to calculate time spent
self.main_start = datetime.datetime.now()
self.start = self.main_start
# by default, set the resume mode to check output folder
self.resume_mode = self.resume_modes['complete_file_count']
# Uncomment this line to ask for user input, on the shell.
# self.resume_mode = raw_input("s) Start from scratch c) Resume from missing files:")
# Create or open a shelf to keep a cache of line number, page number, and performance stats
self.chunk_pointers = shelve.open(self.cache_files_path + 'chunk_pointers.log')
self.persistent_performance_tracker = shelve.open(self.cache_files_path + 'persistent_performance_tracker.log')
# Init shelf counter
# *** Resume from missing files on the output folder
# (Resume an interrupted operation by checking the existing files on the output folders)
if self.resume_mode == self.resume_modes['complete_file_count']:
previously_existent_file_count = 0
for output_root, output_dirnames, output_filenames in os.walk(self.page_files_path):
for dirname in output_dirnames:
for root, dirnames, filenames in os.walk(self.page_files_path + dirname):
for filename in filenames:
if filename.endswith(self.output_file_extension) and filename.startswith(self.output_file_prefix):
previously_existent_file_count += 1
resume_from_page = int(math.floor(previously_existent_file_count/self.pool_size) * self.pool_size)
if '%s' % (resume_from_page) in self.chunk_pointers:
self.page_count = resume_from_page
self.byte_count = self.chunk_pointers['%s' % self.page_count]
else:
self.byte_count = 0
self.page_count = 0
# *** Do not resume
elif resume == self.start_from_scratch:
self.byte_count = 0
self.page_count = 0
# Create folder if doesn't exist
def create_folder(self, path):
if not os.path.exists(path):
os.makedirs(path)
# Get 30 pages a time and store them in memory
def slice_file(self, start=0, end=30):
max_pages = end - start
chunk = self.chunk_tail
pages_stored = 0
while True and max_pages:
new_chunk = self.input_file.read(10000)
if new_chunk:
chunk += new_chunk
pages_stored = len(chunk.split(self.end_string))
if pages_stored > max_pages:
pages_for_next_slice = max_pages - pages_stored
if pages_for_next_slice == 0:
pages_for_next_slice = -1
self.chunk_tail = ''.join(chunk.split(self.end_string)[pages_for_next_slice:])
return ''.join(chunk.split(self.end_string)[0:max_pages])
else:
return ''.join(chunk.split(self.end_string))
def get_folder_name(self):
folder_name = int(math.floor(self.page_count/self.batch_size) * self.batch_size)
folder_name = '%s%s' % (folder_name, os.sep)
return folder_name
def save_page(self, path, file_contents):
with open(path, 'w') as output_file:
output_file.write(file_contents)
def process_queue(self):
for page in self.pages_queue:
self.pool.apply_async(
self.save_page,
args = (page[0], page[1])
)
def save_graph(self):
performance_seconds = []
performance_page_count = []
vals = self.persistent_performance_tracker.items()
ordered_vals = sorted(vals, key=lambda i: int(i[0]))
for val in ordered_vals:
performance_seconds += [val[1]]
performance_page_count += [val[0]]
plt.clf()
plt.plot(performance_page_count, performance_seconds)
plt.ylabel('Task duration progress')
plt.savefig(self.graph_file_path)
def handle_status_reports(self):
# Update graph
if self.page_count % self.update_graph_each == 0:
self.end = datetime.datetime.now()
average = (self.end - self.start)/self.show_status_each
average = average.total_seconds()
self.persistent_performance_tracker['%s' % self.page_count] = average
self.persistent_performance_tracker.sync()
self.save_graph()
# Print status to stdout
if self.page_count % self.show_status_each == 0:
self.end = datetime.datetime.now()
average = (self.end - self.start)/self.show_status_each
average = average.total_seconds()
log = self.log_template.format(
page= self.page_count,
exported = self.end,
average = average,
show_status_each = self.show_status_each,
process = self.end - self.start,
queue = self.end - self.main_start
)
self.persistent_performance_tracker['%s' % self.page_count] = average
self.persistent_performance_tracker.sync()
sys.stdout.write(log)
sys.stdout.flush()
self.start = datetime.datetime.now()
# Go through xml file lines and output data to html files
def read_xml(self):
tag_contents = ''
# Seek page where to pick up from
self.slice_file(0, self.page_count)
# self.slice_file(0, self.page_memory)
while self.all_pages_extracted == False:
# check if there are still bytes to read
try:
chunk = self.slice_file(0, self.page_memory)
except:
break
if not chunk:
break
pages_in_chunk = chunk.split(self.start_string)[1:]
for page_i, page_contents in enumerate(pages_in_chunk):
# new page start was found, count
self.page_count += 1
# create batch folder
if self.page_count % self.batch_size == 0 or self.page_count == 1:
self.create_folder(self.page_files_path + self.get_folder_name())
output_file_name = '{pre}{page_count}{ext}'.format(
pre = self.output_file_prefix,
page_count = self.page_count,
ext = self.output_file_extension
)
# if it's the last page, set the flag and ignore closing tag
if self.end_doc in page_contents:
page_contents = page_contents.split(self.end_doc)[0]
self.all_pages_extracted = True
self.pages_queue += [(
self.page_files_path + self.get_folder_name() + output_file_name,
tag_contents + page_contents
)]
self.handle_status_reports()
if self.page_count % self.pool_size == 0:
# keep track of byte pointers where worker pools start
self.chunk_pointers['%s' % self.page_count] = self.byte_count
self.chunk_pointers.sync()
self.process_queue()
self.pages_queue = []
self.byte_count += len(page_contents)
self.close()
def close(self):
self.process_queue()
self.save_graph()
self.chunk_pointers.close()
self.persistent_performance_tracker.close()
self.pool.close()
self.pool.join()
if __name__ == '__main__':
xso = xml_split()
xso.read_xml()
要創建xml
與50000 html
s時,在問題中描述的格式:
腳本生成mathplotlib圖表來跟蹤的平均條目提取時間,和它似乎穩定。這是50.000條目在我的電腦圖形(入口出口持續時間,以秒計,在y軸,並遠銷於x軸條目數):
隨意取消該過程並啓動它再次。默認行爲是檢查現有的導出文件並從那裏恢復。
您可以添加XML文件的一部分,使我們可以有一個更好的把握?謝謝! –
我用我使用的格式的一段樣例XML更新了問題。謝謝! – miller9904
當您閱讀每一行時,爲什麼不更新計數器? –