2016-12-22 24 views
-4

我有1,500,000,000行數據保存到多個txt文件中。數據格式如下:如何有效地解析1500臺銑削線數據?

key1 key2 

其中key1是url,key2是mysql記錄row_id。

我寫了下面的python代碼來解析數據,但速度很慢。 例如

import Queue 
import threading 

class CheckThread(threading.Thread): 
    def __init__(self, queue, src_folder, dest_folder='check_result'): 
     super(CheckThread, self).__init__() 
     self._queue = queue 
     self.daemon = True 

    def run(self): 
     while True: 
      file_name = self._queue.get() 
      try: 
       self._prepare_check(file_name) 
      except: 
       self._queue.task_done() 
       continue 
      self._queue.task_done() 


def Check(src_folder, workers=12, dest_folder='check_result'): 
    queue = Queue.Queue() 

    for (dirpath, dirnames, filelist) in os.walk(src_folder): 
     for name in filelist: 
      if name[0] == '.': 
       continue 
      queue.put(os.path.join(dirpath, name)) 

    for worker in xrange(workers): 
     worker = str(worker + 1) 
     t = CheckThread(queue, src_folder, dest_folder) 
     t.start() 

    queue.join() 


def main(folder, worker=12, out='check_result'): 
    try: 
     Check(folder, worker, out) 
    except: 
     return 1 
    return 0 

每個線程都從隊列中解析出一個文件。

如何提高每個文件的解析速度。

+2

如果您的代碼工作,想改善它,嘗試:http://codereview.stackexchange.com – MooingRawr

+0

代碼審查可能會不喜歡這個問題,因爲它提出了一個有關如何提高效率的有針對性的問題。 codereview更適用於...呃,代碼評論的用途。 「我如何獲得這個工作代碼並使其更具可讀性,可維護性和可擴展性?」另一方面,這個問題更適合Stack Overflow。 OP已經提出了一個有針對性的問題(「我應該如何提高效率?」,儘管「我應該使用其他語言」嗎?部分問題可能太寬泛),並提交了一個大部分最小,完整,可驗證的代碼樣本題。 –

+0

我編輯你的問題試圖瞭解你想說什麼,但我不完全確定一些部分。 – Adirio

回答

0

幾點建議:

  1. 回到1上的錯誤和0上的成功不是Python的。
  2. 永遠不要使用except:,總是指定要捕獲哪些異常。
  3. 您的第一個try: ... except: ...不是此處使用的適配器結構,即使try:部件引發異常,也應該使用try: ... finally: ...執行finally:部件。
  4. CheckThread.__init__()的一些參數未被使用。
  5. CheckThread._prepare_check()做什麼?
  6. 您未使用worker

更改後的代碼如下:

import Queue 
import threading 

class CheckThread(threading.Thread): 
    def __init__(self, queue, src_folder, dest_folder='check_result'): 
     super(CheckThread, self).__init__() 
     self._queue = queue 
     self.daemon = True 
     # Do something with src_folder and dest_folder or delete them from the parameter list 

    def run(self): 
     while True: 
      file_name = self._queue.get() 
      try: 
       self._prepare_check(file_name) 
      finally: 
       self._queue.task_done() 


def Check(src_folder, workers=12, dest_folder='check_result'): 
    queue = Queue.Queue() 

    for (dirpath, dirnames, filelist) in os.walk(src_folder): 
     for name in filelist: 
      if name[0] == '.': 
       continue 
      queue.put(os.path.join(dirpath, name)) 

    for worker in xrange(workers): 
     worker = str(worker + 1) # Do something with worker or delete this line 
     t = CheckThread(queue, src_folder, dest_folder) 
     t.start() 

    queue.join() 


def main(folder, worker=12, out='check_result'): 
    Check(folder, worker, out) 
+0

您可以使用glob.iglob('/ some/path/to/files/*。txt')中的'for file替換嵌套'for'循環:queue.put(file)' –

+0

@ BurhanKhalid他的Python知識非常基礎,所以我只提出了基本的推薦,但你是對的。 – Adirio

+0

非常感謝'CheckThread._prepare_check'請求並獲取正文。 – thinkerou