你已經問過你的問題中兩個不同的東西。一個是如何將CSV從COPY FROM
轉換爲COPY TO
;另一個是如何將SELECT
查詢中的行管道輸入INSERT
。從SELECT
查詢到INSERT
管道排是一種騙人的,因爲當你可以從SELECT
查詢流行,你不能流行到INSERT
,所以你必須要執行多個INSERT
小號分批。由於INSERT
s的原因,此方法的開銷很高,但由於往返於CSV而導致數據丟失的問題較少。我將重點討論爲什麼將CSV從COPY FROM
轉換爲COPY TO
很棘手,以及如何實現它。
psycopg2
讓你通過(同步)copy_expert
函數做COPY
命令。它要求您傳遞可讀文件對象COPY FROM
和可寫文件對象COPY TO
。爲了完成您所描述的內容,需要兩個獨立的線程來運行這兩個命令中的每一個,一個帶有write()
方法的文件對象,該方法在COPY FROM
命令無法跟上時會阻止,以及一個帶有read()
方法的文件對象, COPY TO
命令跟不上。這是一個典型的生產者 - 消費者問題,如果要正確的話可能會很棘手。
這是我寫得很快的一個(Python 3)。它可能充滿了錯誤。讓我知道如果你發現一個僵局(編輯歡迎)。
from threading import Lock, Condition, Thread
class Output(object):
def __init__(self, pipe):
self.pipe = pipe
def read(self, count):
with self.pipe.lock:
# wait until pipe is still closed or buffer is not empty
while not self.pipe.closed and len(self.pipe.buffer) == 0:
self.pipe.empty_cond.wait()
if len(self.pipe.buffer) == 0:
return ""
count = max(count, len(self.pipe.buffer))
res, self.pipe.buffer = \
self.pipe.buffer[:count], self.pipe.buffer[count:]
self.pipe.full_cond.notify()
return res
def close(self):
with self.pipe.lock:
self.pipe.closed = True
self.pipe.full_cond.notify()
class Input(object):
def __init__(self, pipe):
self.pipe = pipe
def write(self, s):
with self.pipe.lock:
# wait until pipe is closed or buffer is not full
while not self.pipe.closed \
and len(self.pipe.buffer) > self.pipe.bufsize:
self.pipe.full_cond.wait()
if self.pipe.closed:
raise Exception("pipe closed")
self.pipe.buffer += s
self.pipe.empty_cond.notify()
def close(self):
with self.pipe.lock:
self.pipe.closed = True
self.pipe.empty_cond.notify()
class FilePipe(object):
def __init__(self, bufsize=4096):
self.buffer = b""
self.bufsize = 4096
self.input = Input(self)
self.output = Output(self)
self.lock = Lock()
self.full_cond = Condition(self.lock)
self.empty_cond = Condition(self.lock)
self.closed = False
用例:
def read_thread(conn, f):
conn.cursor().copy_expert("COPY foo TO STDIN;", f)
f.close()
conn.close()
engine.execute(
"CREATE TABLE foo(id int);"
"CREATE TABLE bar(id int);"
"INSERT INTO foo (SELECT generate_series(1, 100000) AS id);"
"COMMIT;")
input_conn = engine.raw_connection()
output_conn = engine.raw_connection()
pipe = FilePipe()
t = Thread(target=read_thread, args=(input_conn, pipe.input))
t.start()
output_cur = output_conn.cursor()
output_cur.copy_expert("COPY bar FROM STDIN;", pipe.output)
output_conn.commit()
output_conn.close()
t.join()
print(list(engine.execute("SELECT count(*) FROM bar;"))) # 100000
有很多記錄需要遷移? –
最終在我的ETL例程中會有幾個這樣的任務,其中一些可能有很多記錄,而其他的可能只有幾個。所以我正在尋找一個即使在大規模下也能正常工作的強大解決方案。 – Joe