2012-12-11 46 views
1

。我試圖用pycassa加速插入Cassandra。 我聽說使用多線程和打開多個連接加速了很多。我以json格式插入一大堆推文。 我的代碼在這裏工作了一下,然後線程開始拋出異常並停止,看起來線程越多,我停止工作的速度越快......我猜測問題是與cassandra的連接,與連接池有關。有任何想法嗎?Pycassa,threadpool,「線程線程3中的異常(最有可能在解釋器關閉期間引發):」

編輯:所有線程拋出「螺紋加工-3的異常(解釋關閉過程中最有可能提出的):」

import time 
import pycassa 
from pycassa.pool import ConnectionPool 
from pycassa.columnfamily import ColumnFamily 
from datetime import datetime 
import json 
import threadpool 
pool = threadpool.ThreadPool(4) 
kspool = ConnectionPool('TweetsKS',use_threadlocal = True) 

def process_tasks(lines): 

    #let threadpool format your requests into a list 
    requests = threadpool.makeRequests(insert_into_cfs, lines) 

    #insert the requests into the threadpool 
    for req in requests: 
     pool.putRequest(req) 


def read(file): 
    bench = open("bench.txt", "w") 
    bench.write(str(datetime.now())+"\n") 
    """read data from json and insert into keyspace""" 
    json_data=open(file) 
    lines = [] 
    for line in json_data: 
     lines.append(line) 
    process_tasks(lines) 


def insert_into_cfs(line): 

    user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet') 
    user_name_cf = pycassa.ColumnFamily(kspool, 'UserName') 
    tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet') 
    user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower') 

    tweet_data = json.loads(line) 
    """Format the tweet time as an epoch seconds int value""" 
    tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000") 
    tweet_time = int(time.mktime(tweet_time)) 

    new_user_tweet(user_tweet_cf,tweet_data['from_user_id'],tweet_time,tweet_data['id']) 
    new_user_name(user_name_cf,tweet_data['from_user_id'],tweet_data['from_user_name']) 
    new_tweet(tweet_cf,tweet_data['id'],tweet_data['text'],tweet_data['to_user_id']) 

    if tweet_data['to_user_id'] != 0: 
     new_user_follower(user_follower_cf,tweet_data['from_user_id'],tweet_data['to_user_id']) 


"""4 functions below carry out the inserts into specific column families"""  
def new_user_tweet(user_tweet_cf,from_user_id,tweet_time,id): 
    user_tweet_cf.insert(from_user_id,{(tweet_time): id}) 

def new_user_name(user_name_cf,from_user_id,user_name): 
    user_name_cf.insert(from_user_id,{'username': user_name}) 

def new_tweet(tweet_cf,id,text,to_user_id): 
    tweet_cf.insert(id,{ 
    'text': text 
    ,'to_user_id': to_user_id 
    }) 

def new_user_follower(user_follower_cf,from_user_id,to_user_id): 
    user_follower_cf.insert(from_user_id,{to_user_id: 0}) 

if __name__ == '__main__': 
    read('tweets.json') 
+1

你看到了什麼樣的例外?請注意,每次創建ColumnFamily對象都會產生一個查詢,因此您可能希望先創建這些查詢並跨線程共享它們。 –

+0

我剛剛得到「線程線程3中的異常(最有可能在解釋器關閉期間引發):」對於所有線程,它工作正常一段時間,我可以看到服務器上的活動,然後所有線程拋出此異常幾乎在同一時間......這也是我第一次嘗試使用線程池,所以我還需要弄清楚如何獲得真正的異常消息! –

回答

1

好吧,這裏的問題是我使用線程池。 我需要pool.outit pool.putRequest(req)後(循環外) 我的主要線程是在他們休息之前完成的,他們不是守護進程。

有了2個線程,我的卡桑德拉插入速度大約快了一倍......但猜猜怎麼樣?它仍然比MySQL慢! 6線程是相同的...需要更多修補我猜!

+0

很好聽。我對threadpool不熟悉,所以我不知道你在那裏做錯了什麼。順便說一句,你可以接受堆棧溢出自己的答案。 –

相關問題