2016-06-30 69 views
3

我有一個從4個數據庫中讀取4個表的過程。我將這些數據整合到1個postgres數據庫中,共有4個表格。 (每個原始4個數據庫都有相同的4個需要合併的表)。psycopg2中的COPY命令

我現在這樣做的方式現在使用熊貓。我一次從所有4個數據庫讀取一個表,將數據連接成一個數據框,然後使用to_sql將其保存在我的postgres數據庫中。然後我循環到其他數據庫,爲其他表做同樣的事情。

我的問題是速度。我的一個表格每個日期約有1-2毫米的行數,因此完成將數據寫入postgres可能需要大約5,000-6,000秒的時間。將它寫入一個.csv文件然後在pgadmin中使用COPY FROM會更快。

這是我目前的代碼。請注意,有一些函數調用,但它基本上只是指表名。我也有一些基本的日誌記錄正在完成,但這不是太必要。我正在爲源數據庫添加一個需要的列。我從實際上是字符串的字段中剝離了.0,但大熊貓也將它們看作是一個浮點數,並且使用0填充空整數並確保列的類型是int。

def query_database(table, table_name, query_date): 
    df_list = [] 
    log_list = [] 
    for db in ['NJ', 'NJ2', 'LA', 'NA']: 
     start_time = time.clock() 
     query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S') 
     engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db) 
     print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name)) 
     engine = create_engine(engine_name) 
     df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date}) 
     query_end = time.clock() - start_time 
     df['source_database'] = db 
     df['insert_date_utc'] = query_timestamp 
     df['row_count'] = df.shape[0] 
     df['column_count'] = df.shape[1] 
     df['query_time'] = round(query_end, 0) 
     df['maximum_id'] = df['Id'].max() 
     df['minimum_id'] = df['Id'].min() 
     df['source_table'] = table_dict.get(table) 
     log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy() 
     df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1) 
     df_list.append(df) 
     log_list.append(log) 
    log = pd.concat(log_list) 
    log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last') 
    result = pd.concat(df_list) 
    result.drop_duplicates('Id', inplace=True) 
    cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))] 
    result = result[cols] 
    print('Creating string columns for {}'.format(table_name)) 
    for col in modify_str_cols(select_database(db)[0][table]): 
     create_string(result, col) 
    print('Creating integer columns for {}'.format(table_name)) 
    for col in modify_int_cols(select_database(db)[0][table]): 
     create_int(result, col) 
    log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes) 
    print('Inserting {} data into PostgreSQL'.format(table_name)) 
    result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table])) 

如何插入COPY TO和COPY FROM進入此模式以加快速度?我應該只寫.csv文件,然後循環這些或我可以從內存複製到我的postgres?

回答

3

psycopg2提供了一些具體的copy相關apis。如果你想使用csv,你必須使用copy_expert(它允許你指定一個完整的copy語句)。

通常當我這樣做時,我已經使用了copy_expert()和一個類似文件的對象,它遍歷磁盤上的文件。這似乎工作得很好。

這就是說,在你的情況下,我認爲copy_tocopy_from是更好的匹配,因爲它只是postgres到postgres轉移在這裏。注意,這些使用PostgreSQL的複印輸出/輸入語法,而不是CSV(如果你想使用CSV,你必須使用copy_expert)你決定怎麼做事情之前

注意,你需要注意的是:

copy_to將文件複製到類似文件的對象(如StringIO)和copy_from/copy_expert文件。如果你想使用熊貓數據幀,你將不得不考慮這一點,並創建一個類似文件的對象,或者使用csv以及StringIOcopy_expert來生成一個內存中的csv並加載它。

+0

爲了闡明,我正在整合的數據庫來自MS SQL Server。不知道這是否有所作爲。如果我可以避免寫一個文件,那麼這將是最好的。 COPY命令是否可以接受熊貓DataFrame?我正在向數據添加一列,並指定數據類型/填充空白。或者我應該先寫一個臨時文件? – trench

+0

是的,你需要提供一個「類文件對象」,但它可以提供內存中的數據。 –

+0

所以我可以保持一切直到這一行: result.to_sql(create_table(select_database(db)[0] [table]),cms_dtypes.pg_engine,index = False,if_exists ='append',chunksize = 50000 ,dtype = create_dtypes(select_database(db)[0] [table])) 這是我在內存中的最終數據,可以保存到數據庫中。你能提供一個示例代碼來將它寫入我的表嗎?它是copy_to?我如何確保我的標題不包含在內? engine.copy_to('table_name',result) – trench