我試圖找到使用cassandra python客戶端在Cassandra中插入時間序列數據的最快方法。我在本地主機上運行。使用cassandra和python的快速時間系列批量插入
籌備代碼如下所示:
session.execute("CREATE KEYSPACE testt WITH replication = {'class' : 'SimpleStrategy', 'replication_factor': '1'}")
session.set_keyspace('testt')
session.execute('CREATE TABLE t (id text, time timestamp, t float, PRIMARY KEY (id, time))')
prepared = session.prepare("""
INSERT INTO t (id, time, t)
VALUES (?, ?, ?)
""")
index = pd.date_range("2003-01-01", "2004-01-01", freq="1min")
t = np.random.rand(len(index))*2-1
df = pd.DataFrame({'time': index, 't': t}).set_index('time')
print(len(df))
# 525601
然後,我已經試過兩種策略。第一是使用BatchStatements:
before = datetime.datetime.now()
x = 0
delta = 65535
while x < len(df):
batch = BatchStatement(cassandra.query.BatchType.UNLOGGED)
for i, r in df.iloc[x : x + delta].iterrows():
batch.add(prepared, ("ID1", i, r['t']))
session.execute(batch)
x += delta
print("Elapsed: {}".format(datetime.datetime.now() - before))
# Elapsed: 0:01:01.341848
注意,65535個跳躍是由於在BatchStatement的大小的上限。
..和第二個是使用execute_async:
before = datetime.datetime.now()
for i, r in df.iterrows():
session.execute_async(prepared.bind(("ID1", i, r['t'])))
print("Elapsed: {}".format(datetime.datetime.now() - before))
# Elapsed: 0:03:51.169409
我也曾嘗試相同的情況下與InfluxDB(使用其數據幀客戶端),其中,相同的插入大約需要5秒鐘。所以我的問題是,如果受到某種反模式的損害(我讀了一些博客文章,但看起來好像做得很好),或者如果使用Cassandra這種插入類型的速度只是x12慢一些。
有擴展到更多服務器節點的空間 - 或者在客戶端的python多處理等 - 但我想首先理解單節點性能。
任何指針將不勝感激!