2013-01-14 46 views
0

我有一個約200,000個實體的列表,我需要爲這些實體中的每一個查詢特定的RESTful API,最後以所有以JSON格式保存的200,000個實體txt文件。 這樣做的天真的方式是通過200,000個實體的列表並逐個查詢,將返回的JSON添加到列表中,並在完成時將所有內容都添加到文本文件中。喜歡的東西:向文本文件寫入大量查詢

from apiWrapper import api 
from entities import listEntities #list of the 200,000 entities 
a=api() 
fullEntityList=[] 
for entity in listEntities: 
fullEntityList.append(a.getFullEntity(entity)) 

with open("fullEntities.txt","w") as f: 
    simplejson.dump(fullEntityList,f) 

顯然,這是不可靠的,200000個查詢到API將需要大約10個小時左右,所以我想它得到其寫入文件之前,事情會導致錯誤。 我想正確的方法是把它寫成塊,但不知道如何實現它。有任何想法嗎? 另外,我不能用數據庫做到這一點。

回答

2

我建議將它們寫入SQLite數據庫。這是他們爲我自己的小型網絡蜘蛛應用程序做的。因爲你可以很容易地查詢關鍵字,並檢查你已經檢索的關鍵字。這樣,您的應用程序可以輕鬆地繼續停止。特別是如果您下週添加1000個新條目。

從一開始就將「恢復」設計到您的應用程序中。如果有一些意外的異常(比如由於網絡擁塞而導致超時),則不必從頭開始重新啓動,而只需要那些尚未成功檢索的查詢。在200,000個查詢中,99.9%的正常運行時間意味着您必須預期200次失敗!

對於空間效率和性能,它可能會使用壓縮格式,比如在將zson轉儲到數據庫blob之前壓縮json。

SQLite是一個不錯的選擇,除非你的蜘蛛同時在多個主機上運行。對於單個應用程序,sqlite是完美的。

1

最簡單的方法是打開文件中'a'(附加)模式,並把它們寫一個接一個,因爲他們進來。

更好的方法是使用作業隊列。這將允許您將a.getFullEntity調用派生成工作者線程並處理結果,但是當它們返回時/如果它們返回時要處理結果,或計劃重試失敗等。 請參閱Queue

+0

你可以在工作隊列上稍微擴展一點嗎?使用哪些模塊?鏈接到文檔? – leonsas

+1

該鏈接已經在那裏......並且該模塊被稱爲「隊列」 – wim

0

我也會使用一個單獨的文件寫入線程,並使用Queue來記錄所有實體。當我開始時,我認爲這將在5分鐘內完成,但後來證明有點困難。 simplejson和我知道的所有其他這樣的庫不支持部分編寫,所以你不能先寫一個列表中的元素,然後再添加另一個等。所以,我試圖通過編寫[,,]與文件分開,然後分別轉儲每個實體。

沒有能夠檢查(因爲我沒有你的API),你可以嘗試:

import threading 
import Queue 
import simplejson 
from apiWrapper import api 
from entities import listEntities #list of the 200,000 entities 

CHUNK_SIZE = 1000 

class EntityWriter(threading.Thread): 
    lines_written = False 
    _filename = "fullEntities.txt" 

    def __init__(self, queue): 
     super(EntityWriter, self).__init() 
     self._q = queue 
     self.running = False 

    def run(self): 
     self.running = True 
     with open(self._filename,"a") as f: 
      while True: 
       try: 
        entity = self._q.get(block=False) 
        if not EntityWriter.lines_written: 
         EntityWriter.lines_written = True 
         f.write("[") 
         simplejson.dump(entity,f) 
        else: 
         f.write(",\n") 
         simplejson.dump(entity,f) 
       except Queue.Empty: 
        break 
     self.running = False 

    def finish_file(self): 
     with open(self._filename,"a") as f: 
      f.write("]") 


a=api() 
fullEntityQueue=Queue.Queue(2*CHUNK_SIZE) 
n_entities = len(listEntities) 
writer = None 
for i, entity in listEntities: 
    fullEntityQueue.append(a.getFullEntity(entity)) 
    if (i+1) % CHUNK_SIZE == 0 or i == n_entities-1: 
     if writer is None or not writer.running: 
      writer = EntityWriter(fullEntityQueue) 
      writer.start() 
writer.join() 
writer.finish_file() 

這是什麼腳本

主循環仍然迭代列表獲取實體的全部信息。之後每個實體現在被放入隊列中。每個1000個實體(並在列表的末尾)都會啓動一個EntityWriter線程,該線程與主線程並行運行。來自Queue的EntityWriter get並將其轉儲到所需的輸出文件。

需要一些額外的邏輯來使JSON成爲一個列表,如上所述,我手動編寫[,,]。原則上,生成的文件在重新加載時應該被simplejson所理解。