這裏是COPY的二進制等效FROM用於Python 3:
from io import BytesIO
from struct import pack
import psycopg2
# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
(23256, 348, 43.23524, 2494827.949375)]
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]
# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
('i', 'i', 'h', 'f', 'd'),
(4, 4, 2, 4, 8)))
for row in data:
# Number of columns/fields (always 5)
cpy.write(pack('!h', 5))
for col, fmt, size in row_format:
value = (id_seq if col == -1 else row[col])
cpy.write(pack('!i' + fmt, size, value))
id_seq += 1 # manually increment sequence outside of database
# File trailer
cpy.write(pack('!h', -1))
# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)
# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()
更新
我重寫了上面的方法來爲COPY寫入文件。我在Python中的數據是NumPy數組,所以使用它們是有意義的。下面是用1M的行,列7一些示例data
:
import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))
dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
[('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
prv = nxt
在我的數據庫,我有一個看起來像兩個表:
CREATE TABLE num_data_binary
(
id integer PRIMARY KEY,
node integer NOT NULL,
ts smallint NOT NULL,
s0 real,
s1 real,
s2 real,
s3 real,
s4 real,
s5 real,
s6 real
) WITH (OIDS=FALSE);
,並命名爲num_data_text
另一個類似的表。
這裏有一些簡單的輔助功能由NumPy的記錄陣列中使用這些信息來準備數據COPY(文本和二進制格式):
def prepare_text(dat):
cpy = BytesIO()
for row in dat:
cpy.write('\t'.join([repr(x) for x in row]) + '\n')
return(cpy)
def prepare_binary(dat):
pgcopy_dtype = [('num_fields','>i2')]
for field, dtype in dat.dtype.descr:
pgcopy_dtype += [(field + '_length', '>i4'),
(field, dtype.replace('<', '>'))]
pgcopy = np.empty(dat.shape, pgcopy_dtype)
pgcopy['num_fields'] = len(dat.dtype)
for i in range(len(dat.dtype)):
field = dat.dtype.names[i]
pgcopy[field + '_length'] = dat.dtype[i].alignment
pgcopy[field] = dat[field]
cpy = BytesIO()
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
cpy.write(pgcopy.tostring()) # all rows
cpy.write(pack('!h', -1)) # file trailer
return(cpy)
這我如何使用輔助函數基準的兩個拷貝格式的方法:
def time_pgcopy(dat, table, binary):
print('Processing copy object for ' + table)
tstart = datetime.now()
if binary:
cpy = prepare_binary(dat)
else: # text
cpy = prepare_text(dat)
tendw = datetime.now()
print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
str(cpy.tell()) + ' bytes; transfering to database')
cpy.seek(0)
if binary:
curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
else: # text
curs.copy_from(cpy, table)
conn.commit()
tend = datetime.now()
print('Database copy time: ' + str(tend - tendw))
print(' Total time: ' + str(tend - tstart))
return
time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)
下面是最後兩個time_pgcopy
命令的輸出:
Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
Total time: 0:00:24.622095
因此,使用二進制方法NumPy→文件和文件→數據庫步驟都快得多。明顯的區別是Python如何準備COPY文件,這對文本來說確實很慢。一般來說,二進制格式以2/3的時間作爲該模式的文本格式加載到數據庫中。
最後,我比較了數據庫中兩個表中的值,看看數字是否不同。對於列s0
,大約1.46%的行具有不同的值,並且s6
(可能與我使用的隨機方法有關)的這一部分增加到6.17%。所有70M 32位浮點值之間的非零絕對差值介於9.3132257e-010和7.6293945e-006之間。文本和二進制加載方法之間的這些細微差異是由於文本格式方法所需的float→text→float轉換的精度損失所致。
那麼,你可以[導入二進制文件COPY](http://www.postgresql.org/docs/9.1/interactive/sql-copy.html),但爲此,整個文件必須在一個特定的二進制格式,而不僅僅是一個值。 –
@Erwin,是的,我讀了關於COPY的二進制模式,但我不確定它是否被psycopg2支持,或者我是否應該使用不同的方法。 –
我使用的二進制文件格式的唯一應用是導入從* PostgreSQL導出*的文件。我不知道任何其他可以編寫特定格式的程序。儘管如此,這並不意味着它不可能出現在那裏。如果是用於重複操作,則可以以文本形式複製到Postgres一次,下一次寫入二進制文件和'COPY FROM .. FORMAT BINARY'。 –