2011-07-20 45 views
8

我正在寫一個腳本來執行psycopg2在同一網絡上的兩臺機器之間的一些數據的副本。我與管道postgres在python與psycopg2 COPY

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN" 

取代一些舊的,醜陋的bash,做副本這似乎是既做副本的最簡單和most efficient方式。這很容易在Python中複製一個StringIO的或臨時文件,像這樣:

buf = StringIO() 

from_curs = from_conn.cursor() 
to_curs  = to_conn.cursor() 

from_curs.copy_expert("COPY table TO STDOUT", buf) 
buf.seek(0, os.SEEK_SET) 
to_curs.copy_expert("COPY table FROM STDIN", buf) 

...但是,這涉及到/所有的數據保存到磁盤到內存中。

有沒有人想出一種方法來模仿這種副本中的Unix管道的行爲?我似乎無法找到不涉及POpen的unix-pipe對象 - 也許最好的解決方案就是使用POpen和子進程。

+0

好奇的是下面的解決方案工作? – agf

回答

0

你可以使用你的子類,支持讀取和寫入一個deque:

from collections import deque 
from Exceptions import IndexError 

class DequeBuffer(deque): 
    def write(self, data): 
     self.append(data) 
    def read(self): 
     try: 
      return self.popleft() 
     except IndexError: 
      return '' 

buf = DequeBuffer() 

如果讀者是速度遠遠超過了作家,桌子大,deque仍然會得到很大的,但它會比存儲整個事物小。

另外,我不知道return ''deque是空的是安全的,而不是重試,直到它不是空的,但我猜測它是。讓我知道它是否有效。

當您確定複製完成時請記住del buf,特別是如果該腳本不僅僅在此時退出。

12

您必須將您的一個呼叫置於單獨的線程中。我只是意識到你可以使用os.pipe(),這使得其餘的很簡單:

#!/usr/bin/python 
import psycopg2 
import os 
import threading 

fromdb = psycopg2.connect("dbname=from_db") 
todb = psycopg2.connect("dbname=to_db") 

r_fd, w_fd = os.pipe() 

def copy_from(): 
    cur = todb.cursor() 
    cur.copy_from(os.fdopen(r_fd), 'table') 
    cur.close() 
    todb.commit() 

to_thread = threading.Thread(target=copy_from) 
to_thread.start() 

cur = fromdb.cursor() 
write_f = os.fdopen(w_fd, 'w') 
cur.copy_to(write_f, 'table') 
write_f.close() # or deadlock... 

to_thread.join() 
+0

這是一個很棒的解決方案!我很好奇,爲什麼有必要引入一個Thread對象? – Demitri

+3

@Demitri,'copy_from()'和'copy_to()'是阻塞命令;他們在操作完成之前不會返回。如果我們在主線程中進行了第一次調用,那麼它只會等待管道上的數據,並且我們永遠不會再控制回來進行其他調用。 –

+0

啊,我明白了。它仍然會阻塞新的線程,但允許主線在閒暇時供給管道。謝謝。 – Demitri