2014-03-30 90 views
9

我正在嘗試編寫一個多線程Python應用程序,其中線程之間共享單個SQlite連接。我無法得到這個工作。真正的應用程序是一個cherrypy web服務器,但下面的簡單代碼演示了我的問題。如何在多線程Python應用程序中共享單個SQLite連接

下面運行示例代碼需要做些什麼改變?

當我運行THREAD_COUNT這個程序設置爲1時,它工作正常,我的數據庫更新,因爲我期望(即字母「X」被添加到SectorGroup列中的文本值)。

當我將THREAD_COUNT設置爲大於1的任何值時,除1以外的所有線程都會過早地終止與SQLite相關的異常。不同的線程拋出不同的異常(沒有明顯的模式),包括:

OperationalError: cannot start a transaction within a transaction 

(發生在UPDATE語句)

OperationalError: cannot commit - no transaction is active 

(發生在.commit()調用)

InterfaceError: Error binding parameter 0 - probably unsupported type. 

(發生在UPDATESELECT陳述)

IndexError: tuple index out of range 

(這其中有我完全摸不着頭腦,它發生在聲明group = rows[0][0] or '',但只有當多個線程運行)

下面是代碼:

CONNECTION = sqlite3.connect('./database/mydb', detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread = False) 
CONNECTION.row_factory = sqlite3.Row 

def commands(start_id): 

    # loop over 100 records, read the SectorGroup column, and write it back with "X" appended. 
    for inv_id in range(start_id, start_id + 100): 

     rows = CONNECTION.execute('SELECT SectorGroup FROM Investment WHERE InvestmentID = ?;', [inv_id]).fetchall() 
     if rows: 
      group = rows[0][0] or '' 
      msg = '{} inv {} = {}'.format(current_thread().name, inv_id, group) 
      print msg 
      CONNECTION.execute('UPDATE Investment SET SectorGroup = ? WHERE InvestmentID = ?;', [group + 'X', inv_id]) 

     CONNECTION.commit() 

if __name__ == '__main__': 

    THREAD_COUNT = 10 

    for i in range(THREAD_COUNT): 
     t = Thread(target=commands, args=(i*100,)) 
     t.start() 
+2

他們爲什麼需要共享連接?這似乎是一個壞主意。 – ebarr

+2

您是否閱讀過http://docs.python.org/2/library/sqlite3.html#multithreading?只需創建一個連接*每個線程*。 –

+0

基本上,如果您共享線程之間的連接,則需要執行自己的鎖定;請參閱http://bugs.python.org/issue16509瞭解該方向的提示。你最好使用SQLAlchemy並讓它處理池(它也增加了一個有效的工作單元排隊系統)。 –

回答

13

這不是安全共享線程之間的連接;至少你需要使用一個鎖來序列化訪問。由於舊的SQLite版本仍然存在更多問題,因此也請閱讀http://docs.python.org/2/library/sqlite3.html#multithreading

check_same_thread選項在此方面故意缺少記錄,請參閱http://bugs.python.org/issue16509

您可以改爲使用每個線程的連接,或者查看SQLAlchemy的連接池(以及一個非常有效的工作聲明和排隊系統來啓動)。

+0

謝謝,我已經轉換爲每個線程的連接。這導致我的應用程序(真正的,而不是示例代碼)的測試代碼提前終止線程,數據庫被鎖定,即使我沒有冗長的寫入操作。但是,擴展數據庫超時設置似乎已修復該問題,但對性能沒有任何明顯的損害。謝謝! –

+1

擴展數據庫超時不是解決方案。我仍然有幾個跨線程使用連接對象。一旦我擺脫了這些,並且非常小心,每次連接時都要調用.close()(包括僅從數據庫中讀取的線程),然後我可以將超時設置回默認值在重負載下可靠地運行應用程序。 –

3

在編寫簡單的WSGI服務器以獲得樂趣和學習時,我遇到了SqLite線程問題。 在Apache下運行時,WSGI本質上是多線程的。 下面的代碼似乎爲我工作:使用

import sqlite3 
import threading 

class LockableCursor: 
    def __init__ (self, cursor): 
     self.cursor = cursor 
     self.lock = threading.Lock() 

    def execute (self, arg0, arg1 = None): 
     self.lock.acquire() 

     try: 
      self.cursor.execute (arg1 if arg1 else arg0) 

      if arg1: 
       if arg0 == 'all': 
        result = self.cursor.fetchall() 
       elif arg0 == 'one': 
        result = self.cursor.fetchone() 
     except Exception as exception: 
      raise exception 

     finally: 
      self.lock.release() 
      if arg1: 
       return result 

def dictFactory (cursor, row): 
    aDict = {} 
    for iField, field in enumerate (cursor.description): 
     aDict [field [0]] = row [iField] 
    return aDict 

class Db: 
    def __init__ (self, app): 
     self.app = app 

    def connect (self): 
     self.connection = sqlite3.connect (self.app.dbFileName, check_same_thread = False, isolation_level = None) # Will create db if nonexistent 
     self.connection.row_factory = dictFactory 
     self.cs = LockableCursor (self.connection.cursor()) 

例子:

if not ok and self.user: # Not logged out 
    # Get role data for any later use 
    userIdsRoleIds = self.cs.execute ('all', 'SELECT role_id FROM users_roles WHERE user_id == {}'.format (self.user ['id'])) 

    for userIdRoleId in userIdsRoleIds: 
     self.userRoles.append (self.cs.execute ('one', 'SELECT name FROM roles WHERE id == {}'.format (userIdRoleId ['role_id']))) 

又如: http://www.josmith.org/

:採用這種結構可下載的

self.cs.execute ('CREATE TABLE users (id INTEGER PRIMARY KEY, email_address, password, token)')   
self.cs.execute ('INSERT INTO users (email_address, password) VALUES ("{}", "{}")'.format (self.app.defaultUserEmailAddress, self.app.defaultUserPassword)) 

# Create roles table and insert default role 
self.cs.execute ('CREATE TABLE roles (id INTEGER PRIMARY KEY, name)') 
self.cs.execute ('INSERT INTO roles (name) VALUES ("{}")'.format (self.app.defaultRoleName)) 

# Create users_roles table and assign default role to default user 
self.cs.execute ('CREATE TABLE users_roles (id INTEGER PRIMARY KEY, user_id, role_id)') 

defaultUserId = self.cs.execute ('one', 'SELECT id FROM users WHERE email_address = "{}"'.format (self.app.defaultUserEmailAddress)) ['id']   
defaultRoleId = self.cs.execute ('one', 'SELECT id FROM roles WHERE name = "{}"'.format (self.app.defaultRoleName)) ['id'] 

self.cs.execute ('INSERT INTO users_roles (user_id, role_id) VALUES ({}, {})'.format (defaultUserId, defaultRoleId)) 

完整的程序

NB上面的代碼是實驗性的,當與(多個)併發請求一起使用時(例如,作爲WSGI服務器的一部分),可能會有(基本)問題。性能對於我的應用程序並不重要。最簡單的事情可能只是使用MySql,但我喜歡嘗試一下,關於SqLite的零安裝事情吸引了我。如果有人認爲上述代碼存在根本性缺陷,請作出反應,因爲我的目的是學習。如果不是,我希望這對其他人有用。

相關問題