0

我有多個線程處理數據並將其放在一個隊列中,並有一個線程從隊列中獲取數據,然後將其保存到數據庫中。如何在守護程序線程中關閉sqlite連接?

我認爲以下原因會導致內存泄漏:

class DBThread(threading.Thread): 
    def __init__(self, myqueue): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 

    def run(self): 
     conn = sqlite3.connect("test.db") 
     c = conn.cursor() 

     while True: 
      data = myqueue.get() 
      if data: 
       c.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       conn.commit() 

      self.myqueue.task_done() 

     #conn.close() <--- never reaches this point 

q = Queue.Queue() 

# Create other threads 
.... 

# Create DB thread 
t = DBThread(q) 
t.setDaemon(True) 
t.start() 

q.join() 

我不能把conn.close() while循環,因爲我認爲,將關閉在第一回路的連接。我不能將它放在if data:語句中,因爲它不會保存稍後可能放入隊列的數據。

我在哪裏關閉數據庫連接?如果我不關閉它,這會不會導致內存泄漏?

回答

0

如果您可以使用不會出現在正常數據中的標記值,例如None,您可以通知線程停止並關閉在finally條款數據庫連接:

import threading 
import Queue 
import sqlite3 

class DBThread(threading.Thread): 
    def __init__(self, myqueue, db_path): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 
     self.db_path = db_path 

    def run(self): 
     conn = sqlite3.connect(self.db_path) 

     try: 
      while True: 
       data = self.myqueue.get()  
       if data is None: # check for sentinel value 
        break 

       with conn: 
        conn.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       self.myqueue.task_done() 
     finally: 
      conn.close() 


q = Queue.Queue() 
for i in range(100): 
    q.put(str(i)) 

conn = sqlite3.connect('test.db') 
conn.execute('create table if not exists test (data text)') 
conn.close() 

t = DBThread(q, 'test.db') 
t.start() 

q.join() 
q.put(None) # tell database thread to terminate 

如果您不能使用哨兵值,你可以一個標誌添加到在while循環檢查得到的類別。還要將stop()方法添加到設置該標誌的線程類。您將需要使用非阻塞Queue.get()

class DBThread(threading.Thread): 
    def __init__(self, myqueue, db_path): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 
     self.db_path = db_path 
     self._terminate = False 

    def terminate(self): 
     self._terminate = True 

    def run(self): 
     conn = sqlite3.connect(self.db_path) 

     try: 
      while not self._terminate: 
       try: 
        data = self.myqueue.get(timeout=1) 
       except Queue.Empty: 
        continue 

       with conn: 
        conn.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       self.myqueue.task_done() 
     finally: 
      conn.close() 

.... 
q.join() 
t.terminate() # tell database thread to terminate 

最後,值得一提的是,如果分貝線程管理排排隊,即如果q.join()回報你的程序可能會終止。這是因爲數據庫線程是一個守護進程線程,並不會阻止主線程退出。您需要確保您的工作線程產生足夠的數據以保持db線程繁忙,否則將返回q.join()並且主線程將退出。

+0

在這種情況下'conn:'做什麼?通常它會清理資源(在這種情況下是連接),這不是我們現在想要的嗎? – Caramiriel

+1

@ Caramiriel:沒錯,我們不想在那個時候清理。然而,它不會做你的想法;它實現了事務的自動提交/回滾。請參閱[使用連接作爲上下文管理器](https://docs.python.org/2/library/sqlite3.html#using-the-connection-as-a-context-manager)。 – mhawke