2014-02-11 25 views
1

我想解析一個大的空間分隔的文件(3 GB和更高)到sqlite數據庫進行其他處理。該文件目前擁有約2000多萬行數據。我嘗試過多線程,但由於某種原因,它停止了大約1500行,並沒有繼續。我不知道我是否做錯了什麼。有人可以請指點我正確的方向嗎?解析一個大的空間分隔文件到sqlite中

插入工作正常與一個進程,但速度太慢(當然!!!)。它已經運行了七個多小時,甚至沒有超過第一組字符串。數據庫文件的大小仍然是25 MB,甚至沒有接近它必須包含的記錄數。

請引導我加快速度。我有一個更大的文件要去(超過5 GB),這可能需要幾天。

這裏是我的代碼:

1 import time 
2 import queue 
3 import threading 
4 import sys 
5 import sqlite3 as sql 
6 
7 record_count = 0 
8 DB_INSERT_LOCK = threading.Lock() 
9 
10 def process_data(in_queue): 
11  global record_count 
12  try: 
13   mp_db_connection = sql.connect("sequences_test.sqlite") 
14   sql_handler = mp_db_connection.cursor() 
15  except sql.Error as error: 
16   print("Error while creating database connection: ", error.args[0]) 
17  while True: 
18   line = in_queue.get() 
19   # print(line) 
20   if (line[0] == '@'): 
21    pass 
22   else: 
23    (sequence_id, field1, sequence_type, sequence_count, field2, field3, 
24    field4, field5, field6, sequence_info, kmer_length, field7, field8, 
25    field9, field10, field11, field12, field13, field14, field15) =  
                    line.expandtabs(1).split(" ") 
26 
27    info = (field7 + " " + field8 + " " + field9 + " " + field10 + " " + 
28      field11 + " " + field12 + " " + field13 + " " + field14 + " " 
29      + field15) 
30 
31    insert_tuple = (None, sequence_id, field1, sequence_type, sequence_count, 
32        field2, field3, field4, field5, field6, sequence_info, 
33        kmer_length, info) 
34    try: 
35     with DB_INSERT_LOCK: 
36      sql_string = 'insert into sequence_info \ 
37         values (?,?,?,?,?,?,?,?,?,?,?,?,?)' 
38      sql_handler.execute(sql_string, insert_tuple) 
39      record_count = record_count + 1 
40      mp_db_connection.commit() 
41    except sql.Error as error: 
42     print("Error while inserting service into database: ", error.args[0]) 
43    in_queue.task_done() 
44 
45 if __name__ == "__main__": 
46  try: 
47   print("Trying to open database connection") 
48   mp_db_connection = sql.connect("sequences_test.sqlite") 
49   sql_handler = mp_db_connection.cursor() 
50   sql_string = '''SELECT name FROM sqlite_master \ 
51       WHERE type='table' AND name='sequence_info' ''' 
52   sql_handler.execute(sql_string) 
53   result = sql_handler.fetchone() 
54   if(not result): 
55    print("Creating table") 
56    sql_handler.execute('''create table sequence_info 
57         (row_id integer primary key, sequence_id real, field1 
58         integer, sequence_type text, sequence_count real, 
59         field2 integer, field3 text, 
60         field4 text, field5 integer, field6 integer, 
61         sequence_info text, kmer_length text, info text)''') 
62    mp_db_connection.commit() 
63   else: 
64    pass 
65   mp_db_connection.close() 
66  except sql.Error as error: 
67   print("An error has occured.: ", error.args[0]) 
68 
69  thread_count = 4 
70  work = queue.Queue() 
71 
72  for i in range(thread_count): 
73   thread = threading.Thread(target=process_data, args=(work,)) 
74   thread.daemon = True 
75   thread.start() 
76 
77  with open("out.txt", mode='r') as inFile: 
78   for line in inFile: 
79    work.put(line) 
80 
81  work.join() 
82 
83  print("Final Record Count: ", record_count) 

我之所以有一個鎖是使用SQLite,我目前沒有辦法批提交我的文件到數據庫,因此我必須確保每次線程插入一條記錄時,都會提交數據庫的狀態。

我知道我正在用expandtabs調用一些處理時間的東西,但是要處理我正在接收的文件來做一個簡單的拆分它有點困難。我會繼續努力做到這一點,以減少工作量,但我至少需要多線程才能工作。

編輯:

我把expandtabs和split部分移到了處理之外。所以我處理該行並將其作爲元組插入到隊列中,以便線程可以將其拾取並直接將其插入到數據庫中。我希望能夠節省相當多的時間,但現在我遇到了與sqlite有關的問題。它說不能插入數據庫,因爲它被鎖定。我認爲它更像是鎖定部分的線程同步問題,因爲我在下面的關鍵部分有獨佔鎖定。有人可以詳細說明如何解決這個問題嗎?

回答

2

我不希望多線程在那裏有很多用處。你也許應該編寫一個生成函數,處理文件轉換成元組,然後您可以用executemany

+0

正在爲這樣一個巨大的文件寫一個生成器是個好主意嗎?它的大小是3 GB。內存問題等? – adwaraki

+0

這就是您使用生成器而不是列表的確切原因。 –

+0

其實是個好主意。我沒有意識到這一點。無論如何,速度都會提高。我可以看到數據庫大小隨着插入而增加。我想檢查是否有記錄,所以我停止了腳本。數據庫大小爲零。一個提交發生在什麼時候?或者我什麼時候需要做一個提交?謝謝您的幫助。 – adwaraki

0

multihtreading插入不會幫你

你必須做根據 http://sqlite.org/speed.html它不提交每個記錄的第一件事速度是250倍。

爲了不失去所有的工作,如果你打斷剛剛提交每10000個或100000記錄