2016-10-26 30 views
1

我正在編寫一個Python腳本,它將迭代文檔的大型XML文件並將每篇文章寫入新文件。我還想保留文件中每篇文章的起始位置的索引。如果腳本在處理文件時失敗(這需要幾個小時),我希望能夠使用類似file.seek()之類的東西來獲取我離開的位置。確定一行開頭的實際文件位置Python

這是我當前的代碼:

with open(inputPath, 'rb') as inputfile: # Iterate over the dump 
    for line in inputfile: 
    # do stuff... 

我相信file.tell() inconsistency在這裏也適用,不會給我正確的文件位置。我也想確定行的開始位置而不是結束位置。

XML文件看起來像:

<pages> 
    <page> 
    This is an article. Article-ly stuff goes here. 
    </page> 
    <page> 
    ... x5,000,000 
    </page> 
</pages> 

我想獲得與<page>標籤的行開頭的位置,因爲我遍歷文件。

+0

您可以添加XML文件的一部分,使我們可以有一個更好的把握?謝謝! –

+0

我用我使用的格式的一段樣例XML更新了問題。謝謝! – miller9904

+0

當您閱讀每一行時,爲什麼不更新計數器? –

回答

0

你可以自己控制字節指針。在這個例子中,我使用帶有字節指針的dictionary,並將它存儲在shelf中。然後我從xml導出html文件。

默認情況下,此腳本輸出每輸出1000頁的狀態。它會生成一個圖形圖像(默認情況下,項目根目錄中的'xml-split-performance.jpg',就像本文末尾的圖片一樣),因此您可以確保該過程正常工作。

默認情況下,我使用8位工作人員:我在內存中存儲8頁,然後分發8位工作人員編寫的8頁。不知道這是最好的方法,但對我的50k html s運行良好。

from multiprocessing.pool import ThreadPool as Pool 
import matplotlib.pyplot as plt 
import itertools, os, sys, datetime, math 
from urlparse import urljoin 
import shelve 
import collections 
from functools import partial 

class xml_split(): 

    # Flags 
    track_line = False 
    last_line = False 
    output_file = False 
    all_pages_extracted = False 

    # Modes available: 
    # - Read output folder and calculate missing files: c (default) 
    # - Start from scratch: s 
    resume_modes = { 
     'complete_file_count': 'c', 
     'start_from_scratch': 's', 
    } 

    # Queue of pages in memory, before writting to file 
    pages_queue = [] 

    def __init__(
      self, 

      # Number of workers 
      pool_size = 8, 

      # Read x pages at a time 
      page_memory = 30, 

      # Path to the input xml file 
      input_xml_file_path = "sample.xml", 

      # File name prefix and extension 
      output_file_prefix = 'page_', 
      output_file_extension = '.html', 

      # Path to the output folder: will be created under the script path 
      page_files_folder = 'pages', 

      # Path to the cache folder: will be created under the script path 
      cache_files_folder = 'cache', 

      # filename of a graph that shows the average performance 
      graph_file_path = 'xml-split-performance.jpg', 

      # update graph each x number of pages extracted 
      update_graph_each = 1000, 

      # print status on stdout each x number of pages extracted 
      show_status_each = 1000, 

      # group files in folders of x pages 
      batch_size = 1000, 

      # tags to track 
      start_string = '<page>', 
      end_string = '</page>', 
      start_doc = '<pages>', 
      end_doc = '</pages>', 

      # A little template to output the status 
      log_template = """ 
      Page:         {page} 
      Time:         {exported} 
      Time since the beginning:    {queue} 
      Reporting each n exports:    {show_status_each} 
      Time since last report:    {process} 
      Average entry export time:    {average} 
      """, 
     ): 

     self.pool_size = pool_size 
     self.pool = Pool(pool_size) 

     self.input_xml_file_path = input_xml_file_path 
     self.input_file = open(input_xml_file_path) 

     self.output_file_prefix = output_file_prefix 
     self.output_file_extension = output_file_extension 
     self.page_files_folder = page_files_folder 
     self.cache_files_folder = cache_files_folder 

     self.page_memory = page_memory 
     self.show_status_each = show_status_each 
     self.update_graph_each = update_graph_each 
     self.batch_size = batch_size 

     self.graph_file_path = graph_file_path 

     self.start_string = start_string 
     self.end_string = end_string 
     self.end_doc = end_doc 
     self.start_doc = start_doc 
     self.log_template = log_template 
     self.chunk_tail = '' 

     # Set project path to the current script path 
     self.project_path = os.getcwd() + os.sep 

     # Folder to place output files 
     self.page_files_path = urljoin(self.project_path, self.page_files_folder) + os.sep 

     # Folder to place cache files 
     self.cache_files_path = urljoin(self.project_path, self.cache_files_folder) + os.sep 

     self.create_folder(self.page_files_path) 
     self.create_folder(self.cache_files_path) 

     # keep track of time, to calculate time spent 
     self.main_start = datetime.datetime.now() 
     self.start = self.main_start 

     # by default, set the resume mode to check output folder 
     self.resume_mode = self.resume_modes['complete_file_count'] 

     # Uncomment this line to ask for user input, on the shell. 
     # self.resume_mode = raw_input("s) Start from scratch c) Resume from missing files:") 

     # Create or open a shelf to keep a cache of line number, page number, and performance stats 
     self.chunk_pointers = shelve.open(self.cache_files_path + 'chunk_pointers.log') 
     self.persistent_performance_tracker = shelve.open(self.cache_files_path + 'persistent_performance_tracker.log') 


     # Init shelf counter 


     # *** Resume from missing files on the output folder 
     # (Resume an interrupted operation by checking the existing files on the output folders) 
     if self.resume_mode == self.resume_modes['complete_file_count']: 
      previously_existent_file_count = 0 
      for output_root, output_dirnames, output_filenames in os.walk(self.page_files_path): 
       for dirname in output_dirnames: 
        for root, dirnames, filenames in os.walk(self.page_files_path + dirname): 
         for filename in filenames: 
          if filename.endswith(self.output_file_extension) and filename.startswith(self.output_file_prefix): 
           previously_existent_file_count += 1 

      resume_from_page = int(math.floor(previously_existent_file_count/self.pool_size) * self.pool_size) 
      if '%s' % (resume_from_page) in self.chunk_pointers: 
       self.page_count = resume_from_page 
       self.byte_count = self.chunk_pointers['%s' % self.page_count] 
      else: 
       self.byte_count = 0 
       self.page_count = 0 

     # *** Do not resume 
     elif resume == self.start_from_scratch: 
      self.byte_count = 0 
      self.page_count = 0 

    # Create folder if doesn't exist 
    def create_folder(self, path): 
     if not os.path.exists(path): 
      os.makedirs(path) 

    # Get 30 pages a time and store them in memory 
    def slice_file(self, start=0, end=30): 
     max_pages = end - start 
     chunk = self.chunk_tail 
     pages_stored = 0 
     while True and max_pages: 
      new_chunk = self.input_file.read(10000) 
      if new_chunk: 

       chunk += new_chunk 
       pages_stored = len(chunk.split(self.end_string)) 

       if pages_stored > max_pages: 
        pages_for_next_slice = max_pages - pages_stored 
        if pages_for_next_slice == 0: 
         pages_for_next_slice = -1 
        self.chunk_tail = ''.join(chunk.split(self.end_string)[pages_for_next_slice:]) 
        return ''.join(chunk.split(self.end_string)[0:max_pages]) 

      else: 
       return ''.join(chunk.split(self.end_string)) 


    def get_folder_name(self): 
     folder_name = int(math.floor(self.page_count/self.batch_size) * self.batch_size) 
     folder_name = '%s%s' % (folder_name, os.sep) 
     return folder_name 

    def save_page(self, path, file_contents): 
     with open(path, 'w') as output_file: 
      output_file.write(file_contents) 

    def process_queue(self): 
     for page in self.pages_queue: 
      self.pool.apply_async(
       self.save_page, 
       args = (page[0], page[1]) 
      ) 

    def save_graph(self): 

     performance_seconds = [] 
     performance_page_count = [] 
     vals = self.persistent_performance_tracker.items() 
     ordered_vals = sorted(vals, key=lambda i: int(i[0])) 

     for val in ordered_vals: 
      performance_seconds += [val[1]] 
      performance_page_count += [val[0]] 


     plt.clf() 
     plt.plot(performance_page_count, performance_seconds) 
     plt.ylabel('Task duration progress') 

     plt.savefig(self.graph_file_path) 

    def handle_status_reports(self): 

     # Update graph 
     if self.page_count % self.update_graph_each == 0: 

      self.end = datetime.datetime.now() 
      average = (self.end - self.start)/self.show_status_each 
      average = average.total_seconds() 


      self.persistent_performance_tracker['%s' % self.page_count] = average 
      self.persistent_performance_tracker.sync() 

      self.save_graph() 

     # Print status to stdout 
     if self.page_count % self.show_status_each == 0: 

      self.end = datetime.datetime.now() 
      average = (self.end - self.start)/self.show_status_each 
      average = average.total_seconds() 

      log = self.log_template.format(
        page= self.page_count, 
        exported = self.end, 
        average = average, 
        show_status_each = self.show_status_each, 
        process = self.end - self.start, 
        queue = self.end - self.main_start 
       ) 


      self.persistent_performance_tracker['%s' % self.page_count] = average 
      self.persistent_performance_tracker.sync() 

      sys.stdout.write(log) 
      sys.stdout.flush() 
      self.start = datetime.datetime.now() 



    # Go through xml file lines and output data to html files 
    def read_xml(self): 

     tag_contents = '' 

     # Seek page where to pick up from 
     self.slice_file(0, self.page_count) 


     # self.slice_file(0, self.page_memory) 

     while self.all_pages_extracted == False: 
      # check if there are still bytes to read 
      try: 
       chunk = self.slice_file(0, self.page_memory) 
      except: 
       break 

      if not chunk: 
       break 

      pages_in_chunk = chunk.split(self.start_string)[1:] 


      for page_i, page_contents in enumerate(pages_in_chunk): 

       # new page start was found, count 
       self.page_count += 1 

       # create batch folder 
       if self.page_count % self.batch_size == 0 or self.page_count == 1: 
        self.create_folder(self.page_files_path + self.get_folder_name()) 


       output_file_name = '{pre}{page_count}{ext}'.format(
        pre = self.output_file_prefix, 
        page_count = self.page_count, 
        ext = self.output_file_extension 
       ) 

       # if it's the last page, set the flag and ignore closing tag 
       if self.end_doc in page_contents: 
        page_contents = page_contents.split(self.end_doc)[0] 
        self.all_pages_extracted = True 

       self.pages_queue += [(
        self.page_files_path + self.get_folder_name() + output_file_name, 
        tag_contents + page_contents 
       )] 

       self.handle_status_reports() 

       if self.page_count % self.pool_size == 0: 
        # keep track of byte pointers where worker pools start 
        self.chunk_pointers['%s' % self.page_count] = self.byte_count 
        self.chunk_pointers.sync() 
        self.process_queue() 
        self.pages_queue = [] 

       self.byte_count += len(page_contents) 


     self.close() 


    def close(self): 
     self.process_queue() 
     self.save_graph() 
     self.chunk_pointers.close() 
     self.persistent_performance_tracker.close() 
     self.pool.close() 
     self.pool.join() 

if __name__ == '__main__': 

    xso = xml_split() 

    xso.read_xml() 

要創建xml與50000 html s時,在問題中描述的格式:

腳本生成mathplotlib圖表來跟蹤的平均條目提取時間,和它似乎穩定。這是50.000條目在我的電腦圖形(入口出口持續時間,以秒計,在y軸,並遠銷於x軸條目數):

graph showing average time for each entry export

隨意取消該過程並啓動它再次。默認行爲是檢查現有的導出文件並從那裏恢復。

+0

我不熟悉lxml等。看起來好像將整個文件加載到內存中,這是行不通的,因爲我的文件是57GB的維基百科轉儲文件。現在,我正在逐行掃描文件,並在找到''標記時進行分割。我犯了簡單的寫行號而不是位置到我的索引中的錯誤。 – miller9904

+0

你可以在這裏看到我目前的腳本:https://gist.github.com/miller9904/e51e698154fe388484c6868eec09988b – miller9904

+0

對不起,我還沒有通過你的代碼呢。我用另一種方法更新了答案。讓我知道你的想法? –

1

這是基於你的鏈接答案的解決方案:

offsets = [] 
with open('test.xml', 'rb') as inputfile: # Iterate over the dump 
    # Save 1st line offset 
    current = inputfile.tell() 
    for line in iter(inputfile.readline,''): 
     if line.lstrip().startswith('<page>'): 
      offsets.append(current) 
     # Update to the current offset (line about to be read) 
     current = inputfile.tell() 

# Demo the offsets are lines with <page> 
with open('test.xml', 'rb') as inputfile: # Iterate over the dump 
    for offset in offsets: 
     inputfile.seek(offset) 
     print offset,inputfile.readline() 

輸出:

9 <page> 

82 <page> 
+0

我正在處理一個57 GB的xml文件,並且我認爲每次加載一行會太慢。 – miller9904

+1

@ miller9904,這在問題中沒有說明,並且是你在你的問題中所做的。我回答了問題。 –

相關問題