2016-08-01 28 views
1

有我的RPC方法兩個操作:如何使用asyncio和postgres在python中進行交易?

async def my_rpc(self, data): 
    async with self.Engine() as conn: 
     await conn.execute("SELECT ... FROM MyTable"); 
     ... # It seems the table MyTable can be changed by another RPC 
     await conn.execute("UPDATA MyTable ..."); 

另一個RPC方法可以操作前「my_rpc」將完成(SQL查詢兩個等待着之間)改變DB。如何避免這種情況?

self.Engine的代碼(帶發動機調用aiopg.sa.create_engine):

class ConnectionContextManager(object): 
    def __init__(self, engine): 
     self.conn = None 
     self.engine = engine 

    async def __aenter__(self): 
     if self.engine: 
      self.conn = await self.engine.acquire() 
      return self.conn 

    async def __aexit__(self, exc_type, exc, tb): 
     try: 
      self.engine.release(self.conn) 
      self.conn.close() 
     finally: 
      self.conn = None 
      self.engine = None 
+0

你可以顯示self.Engine的代碼嗎? – jsbueno

+0

@jsbueno我添加了代碼 – Broly

回答

2

首先,aiopg以自動提交模式工作,這意味着您必須在手動模式下使用交易。 Read more details。其次,您必須使用SELECT FOR UPDATE來讀取第一個語句中讀取的鎖定行。 SELECT FOR UPDATE鎖定選擇行,直到事務完成。 Read more details

async def my_rpc(self, data): 
    async with self.Engine() as conn: 
     await conn.execute("BEGIN") 
     await conn.execute("SELECT ... FROM MyTable WHERE some_clause = some_value FOR UPDATE") 
     ... # It seems the table MyTable can be changed by another RPC 
     await conn.execute("UPDATE MyTable SET some_clause=...") 
     await conn.execute("""COMMIT""") 
1

它看起來像,以避免混亂的唯一方法是讓每個交易發生在一個單獨的數據庫連接(Python端遊標贏得't do) 要做到這一點的方法是擁有一個連接池 - 並讓您的Engine方法爲每個「異步線程」提供不同的連接。

如果Postgresql本身的連接器是異步感知的(你使用哪個驅動程序,btw?),那會更容易一些。或者它上面的數據庫包裝層。如果不是,你將不得不自己實現這個連接池。我認爲Sqlalchemy連接池將適用於這種情況,因爲獨立於共同程序使用,連接將僅在async with區塊結束時釋放。

+0

我還沒有測試過我的代碼。 但是,使用「aiopg.sa」的單獨連接。 也許這正是我需要的? – Broly

+0

是的。因爲你正在通過'async with'獲取你的連接,並使用一個準備好asyncio的連接器,這應該就是你所需要的。 – jsbueno

+0

我想所以不是所有的...你必須手動使用事務並且還需要更新 – Petr