我有一個從4個數據庫中讀取4個表的過程。我將這些數據整合到1個postgres數據庫中,共有4個表格。 (每個原始4個數據庫都有相同的4個需要合併的表)。psycopg2中的COPY命令
我現在這樣做的方式現在使用熊貓。我一次從所有4個數據庫讀取一個表,將數據連接成一個數據框,然後使用to_sql將其保存在我的postgres數據庫中。然後我循環到其他數據庫,爲其他表做同樣的事情。
我的問題是速度。我的一個表格每個日期約有1-2毫米的行數,因此完成將數據寫入postgres可能需要大約5,000-6,000秒的時間。將它寫入一個.csv文件然後在pgadmin中使用COPY FROM會更快。
這是我目前的代碼。請注意,有一些函數調用,但它基本上只是指表名。我也有一些基本的日誌記錄正在完成,但這不是太必要。我正在爲源數據庫添加一個需要的列。我從實際上是字符串的字段中剝離了.0,但大熊貓也將它們看作是一個浮點數,並且使用0填充空整數並確保列的類型是int。
def query_database(table, table_name, query_date):
df_list = []
log_list = []
for db in ['NJ', 'NJ2', 'LA', 'NA']:
start_time = time.clock()
query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S')
engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db)
print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name))
engine = create_engine(engine_name)
df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date})
query_end = time.clock() - start_time
df['source_database'] = db
df['insert_date_utc'] = query_timestamp
df['row_count'] = df.shape[0]
df['column_count'] = df.shape[1]
df['query_time'] = round(query_end, 0)
df['maximum_id'] = df['Id'].max()
df['minimum_id'] = df['Id'].min()
df['source_table'] = table_dict.get(table)
log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy()
df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1)
df_list.append(df)
log_list.append(log)
log = pd.concat(log_list)
log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last')
result = pd.concat(df_list)
result.drop_duplicates('Id', inplace=True)
cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))]
result = result[cols]
print('Creating string columns for {}'.format(table_name))
for col in modify_str_cols(select_database(db)[0][table]):
create_string(result, col)
print('Creating integer columns for {}'.format(table_name))
for col in modify_int_cols(select_database(db)[0][table]):
create_int(result, col)
log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes)
print('Inserting {} data into PostgreSQL'.format(table_name))
result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table]))
如何插入COPY TO和COPY FROM進入此模式以加快速度?我應該只寫.csv文件,然後循環這些或我可以從內存複製到我的postgres?
爲了闡明,我正在整合的數據庫來自MS SQL Server。不知道這是否有所作爲。如果我可以避免寫一個文件,那麼這將是最好的。 COPY命令是否可以接受熊貓DataFrame?我正在向數據添加一列,並指定數據類型/填充空白。或者我應該先寫一個臨時文件? – trench
是的,你需要提供一個「類文件對象」,但它可以提供內存中的數據。 –
所以我可以保持一切直到這一行: result.to_sql(create_table(select_database(db)[0] [table]),cms_dtypes.pg_engine,index = False,if_exists ='append',chunksize = 50000 ,dtype = create_dtypes(select_database(db)[0] [table])) 這是我在內存中的最終數據,可以保存到數據庫中。你能提供一個示例代碼來將它寫入我的表嗎?它是copy_to?我如何確保我的標題不包含在內? engine.copy_to('table_name',result) – trench