2016-11-07 166 views
0

我試圖找到使用cassandra python客戶端在Cassandra中插入時間序列數據的最快方法。我在本地主機上運行。使用cassandra和python的快速時間系列批量插入

籌備代碼如下所示:

session.execute("CREATE KEYSPACE testt WITH replication = {'class' : 'SimpleStrategy', 'replication_factor': '1'}") 
session.set_keyspace('testt') 
session.execute('CREATE TABLE t (id text, time timestamp, t float, PRIMARY KEY (id, time))') 

prepared = session.prepare(""" 
    INSERT INTO t (id, time, t) 
    VALUES (?, ?, ?) 
    """) 

index = pd.date_range("2003-01-01", "2004-01-01", freq="1min") 
t = np.random.rand(len(index))*2-1 
df = pd.DataFrame({'time': index, 't': t}).set_index('time') 

print(len(df)) 
# 525601 

然後,我已經試過兩種策略。第一是使用BatchStatements:

before = datetime.datetime.now() 
x = 0 
delta = 65535 
while x < len(df): 
    batch = BatchStatement(cassandra.query.BatchType.UNLOGGED) 
    for i, r in df.iloc[x : x + delta].iterrows(): 
     batch.add(prepared, ("ID1", i, r['t'])) 
    session.execute(batch)  
    x += delta 
print("Elapsed: {}".format(datetime.datetime.now() - before)) 
# Elapsed: 0:01:01.341848 

注意,65535個跳躍是由於在BatchStatement的大小的上限。

..和第二個是使用execute_async:

before = datetime.datetime.now() 
for i, r in df.iterrows(): 
    session.execute_async(prepared.bind(("ID1", i, r['t']))) 
print("Elapsed: {}".format(datetime.datetime.now() - before)) 
# Elapsed: 0:03:51.169409 

我也曾嘗試相同的情況下與InfluxDB(使用其數據幀客戶端),其中,相同的插入大約需要5秒鐘。所以我的問題是,如果受到某種反模式的損害(我讀了一些博客文章,但看起來好像做得很好),或者如果使用Cassandra這種插入類型的速度只是x12慢一些。

有擴展到更多服務器節點的空間 - 或者在客戶端的python多處理等 - 但我想首先理解單節點性能。

任何指針將不勝感激!

回答

0

批處理(除了特殊用例)總是會變慢,然後插入它們。 未記錄的主要情況性能改進的批次是如果它們都屬於同一個分區,而您的分區不是。

cqlsh的COPY FROM可能會像使用python驅動程序一樣快。您可以看到經過大量優化的代碼https://github.com/apache/cassandra/blob/trunk/pylib/cqlshlib/copyutil.py