2017-05-30 231 views
0

在ETL過程中,我想定期查詢數據庫「A」(例如,所有行的時間戳大於程序的最後一次運行時間),並將該數據移動到數據庫「B」中以供進一步處理。兩者都是PostgreSQL數據庫。我想在Python腳本中進行數據傳輸,使用SQLAlchemy連接到兩個數據庫。什麼是最不混亂,最脆弱的方式呢?如何使用SQLAlchemy將數據直接從一個postgresql數據庫傳輸到另一個數據庫?

我知道Postgres的COPY TOCOPY FROM命令允許通過中間文件(see here)將錶行和查詢結果從一個數據庫服務器傳輸到另一個數據庫服務器。在Unix命令行中,您甚至可以將數據庫A的輸出作爲輸入傳送到數據庫B,而不需要潛在的大型中間文件(see excellent instructions here)。我想知道的是如何在使用兩個SQLAlchemy連接的Python腳本中執行最後一個技巧,而不是使用subprocess來運行shell命令。

import sqlalchemy 
dbA = sqlalchemy.create_engine(connection_string_A) 
dbB = sqlalchemy.create_engine(connection_string_B) 

# how do I do this part? 
dbA.execute('SELECT (column) FROM widgets...') # somehow pipe output into... 
dbB.execute('INSERT INTO widgets (column) ...') # without holding lots of data in memory or on disk 

爲了記錄我沒有使用SQLAlchemy的任何ORM功能,只是裸體SQL查詢。

+0

有很多記錄需要遷移? –

+0

最終在我的ETL例程中會有幾個這樣的任務,其中一些可能有很多記錄,而其他的可能只有幾個。所以我正在尋找一個即使在大規模下也能正常工作的強大解決方案。 – Joe

回答

1

你已經問過你的問題中兩個不同的東西。一個是如何將CSV從COPY FROM轉換爲COPY TO;另一個是如何將SELECT查詢中的行管道輸入INSERT。從SELECT查詢到INSERT

管道排是一種騙人的,因爲當你可以從SELECT查詢流行,你不能流行到INSERT,所以你必須要執行多個INSERT小號分批。由於INSERT s的原因,此方法的開銷很高,但由於往返於CSV而導致數據丟失的問題較少。我將重點討論爲什麼將CSV從COPY FROM轉換爲COPY TO很棘手,以及如何實現它。

psycopg2讓你通過(同步)copy_expert函數做COPY命令。它要求您傳遞可讀文件對象COPY FROM和可寫文件對象COPY TO。爲了完成您所描述的內容,需要兩個獨立的線程來運行這兩個命令中的每一個,一個帶有write()方法的文件對象,該方法在COPY FROM命令無法跟上時會阻止,以及一個帶有read()方法的文件對象, COPY TO命令跟不上。這是一個典型的生產者 - 消費者問題,如果要正確的話可能會很棘手。

這是我寫得很快的一個(Python 3)。它可能充滿了錯誤。讓我知道如果你發現一個僵局(編輯歡迎)。

from threading import Lock, Condition, Thread 


class Output(object): 
    def __init__(self, pipe): 
     self.pipe = pipe 

    def read(self, count): 
     with self.pipe.lock: 
      # wait until pipe is still closed or buffer is not empty 
      while not self.pipe.closed and len(self.pipe.buffer) == 0: 
       self.pipe.empty_cond.wait() 

      if len(self.pipe.buffer) == 0: 
       return "" 

      count = max(count, len(self.pipe.buffer)) 
      res, self.pipe.buffer = \ 
       self.pipe.buffer[:count], self.pipe.buffer[count:] 
      self.pipe.full_cond.notify() 
     return res 

    def close(self): 
     with self.pipe.lock: 
      self.pipe.closed = True 
      self.pipe.full_cond.notify() 


class Input(object): 
    def __init__(self, pipe): 
     self.pipe = pipe 

    def write(self, s): 
     with self.pipe.lock: 
      # wait until pipe is closed or buffer is not full 
      while not self.pipe.closed \ 
        and len(self.pipe.buffer) > self.pipe.bufsize: 
       self.pipe.full_cond.wait() 

      if self.pipe.closed: 
       raise Exception("pipe closed") 

      self.pipe.buffer += s 
      self.pipe.empty_cond.notify() 

    def close(self): 
     with self.pipe.lock: 
      self.pipe.closed = True 
      self.pipe.empty_cond.notify() 


class FilePipe(object): 
    def __init__(self, bufsize=4096): 
     self.buffer = b"" 
     self.bufsize = 4096 
     self.input = Input(self) 
     self.output = Output(self) 
     self.lock = Lock() 
     self.full_cond = Condition(self.lock) 
     self.empty_cond = Condition(self.lock) 
     self.closed = False 

用例:

def read_thread(conn, f): 
    conn.cursor().copy_expert("COPY foo TO STDIN;", f) 
    f.close() 
    conn.close() 

engine.execute(
    "CREATE TABLE foo(id int);" 
    "CREATE TABLE bar(id int);" 
    "INSERT INTO foo (SELECT generate_series(1, 100000) AS id);" 
    "COMMIT;") 
input_conn = engine.raw_connection() 
output_conn = engine.raw_connection() 
pipe = FilePipe() 

t = Thread(target=read_thread, args=(input_conn, pipe.input)) 
t.start() 
output_cur = output_conn.cursor() 
output_cur.copy_expert("COPY bar FROM STDIN;", pipe.output) 
output_conn.commit() 
output_conn.close() 
t.join() 

print(list(engine.execute("SELECT count(*) FROM bar;"))) # 100000 
+0

說我提出一個問題會更準確,但有兩條建議或線索可以看出答案。你提供了一個非常吸引人的解決方案,但是我認爲如果這很複雜的話,最好使用Unix shell命令和管道。 – Joe

0

如果數據不是非常大(可以在單臺主機的主存儲器中持有),你可以嘗試基於熊貓/ python3我的開源ETL工具/ sqlalchemy,bailaohe/parade,我在http://README.md中提供了一個教程。您可以利用熊貓對數據進行轉換並直接返回結果數據框。通過一些配置,熊貓數據幀可以轉儲到不同的目標連接。

對於您的問題,您可以用遊行如下生成一個簡單的SQL類型的任務:

# -*- coding:utf-8 -*- 
from parade.core.task import SqlETLTask 
from parade.type import stdtypes 


class CopyPostgres(SqlETLTask): 

    @property 
    def target_conn(self): 
     """ 
     the target connection to write the result 
     :return: 
     """ 
     return 'target_postgres' 

    @property 
    def source_conn(self): 
     """ 
     the source connection to write the result 
     :return: 
     """ 
     return 'source_postgres' 

    @property 
    def etl_sql(self): 
     """ 
     the single sql statement to process etl 
     :return: 
     """ 
     return """SELECT (column) FROM widgets""" 

你甚至可以直接遊行撰寫DAG的工作流程與多個任務,並安排了工作流程。希望這會有所幫助。

相關問題