1
爲了得到一個Cassandra插入更快我使用多線程,它的工作正常,但如果我添加更多的線程它沒有任何區別,我想我沒有生成更多的連接,我想也許我應該使用pool.execute(f,* args,** kwargs)但我不知道如何使用它,文檔很少。繼承人到目前爲止我的代碼..Cassandra Pycassa連接池,如何正確使用?
import connect_to_ks_bp
from connect_to_ks_bp import ks_refs
import time
import pycassa
from datetime import datetime
import json
import threadpool
pool = threadpool.ThreadPool(20)
count = 1
bench = open("benchCassp20_100000.txt", "w")
def process_tasks(lines):
#let threadpool format your requests into a list
requests = threadpool.makeRequests(insert_into_cfs, lines)
#insert the requests into the threadpool
for req in requests:
pool.putRequest(req)
pool.wait()
def read(file):
"""read data from json and insert into keyspace"""
json_data=open(file)
lines = []
for line in json_data:
lines.append(line)
print len(lines)
process_tasks(lines)
def insert_into_cfs(line):
global count
count +=1
if count > 5000:
bench.write(str(datetime.now())+"\n")
count = 1
#print count
#print kspool.checkedout()
"""
user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet')
user_name_cf = pycassa.ColumnFamily(kspool, 'UserName')
tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet')
user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower')
"""
tweet_data = json.loads(line)
"""Format the tweet time as an epoch seconds int value"""
tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000")
tweet_time = int(time.mktime(tweet_time))
new_user_tweet(tweet_data['from_user_id'],tweet_time,tweet_data['id'])
new_user_name(tweet_data['from_user_id'],tweet_data['from_user_name'])
new_tweet(tweet_data['id'],tweet_data['text'],tweet_data['to_user_id'])
if tweet_data['to_user_id'] != 0:
new_user_follower(tweet_data['from_user_id'],tweet_data['to_user_id'])
""""4 functions below carry out the inserts into specific column families"""
def new_user_tweet(from_user_id,tweet_time,id):
ks_refs.user_tweet_cf.insert(from_user_id,{(tweet_time): id})
def new_user_name(from_user_id,user_name):
ks_refs.user_name_cf.insert(from_user_id,{'username': user_name})
def new_tweet(id,text,to_user_id):
ks_refs.tweet_cf.insert(id,{
'text': text
,'to_user_id': to_user_id
})
def new_user_follower(from_user_id,to_user_id):
ks_refs.user_follower_cf.insert(from_user_id,{to_user_id: 0})
read('tweets.json')
if __name__ == '__main__':
這僅僅是另一個文件..
import pycassa
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily
"""This is a static class I set up to hold the global database connection stuff,
I only want to connect once and then the various insert functions will use these fields a lot"""
class ks_refs():
pool = ConnectionPool('TweetsKS',use_threadlocal = True,max_overflow = -1)
@classmethod
def cf_connect(cls, column_family):
cf = pycassa.ColumnFamily(cls.pool, column_family)
return cf
ks_refs.user_name_cfo = ks_refs.cf_connect('UserName')
ks_refs.user_tweet_cfo = ks_refs.cf_connect('UserTweet')
ks_refs.tweet_cfo = ks_refs.cf_connect('Tweet')
ks_refs.user_follower_cfo = ks_refs.cf_connect('UserFollower')
#trying out a batch mutator whihc is supposed to increase performance
ks_refs.user_name_cf = ks_refs.user_name_cfo.batch(queue_size=10000)
ks_refs.user_tweet_cf = ks_refs.user_tweet_cfo.batch(queue_size=10000)
ks_refs.tweet_cf = ks_refs.tweet_cfo.batch(queue_size=10000)
ks_refs.user_follower_cf = ks_refs.user_follower_cfo.batch(queue_size=10000)