2013-03-29 227 views
0

我是python的新手。我想試試Hbase thrift client using thrift。我在網上獲得了一些代碼,我只是修改了thrift的最新版本,但是當我運行代碼時,它只是退出,沒有線程啓動。python線程立即退出

這是代碼。

import json, traceback, sys, datetime, time, logging, threading, random 
import logging.handlers 

import thrift 
sys.path.append('gen-py') 


from thrift.transport.TSocket import TSocket 
from thrift.transport.TTransport import TBufferedTransport 
from thrift.protocol import TBinaryProtocol 
from hbase import THBaseService 



gWritenItems = 0 
gStartT = 0 
gEndT = 0 

recordsPerBatch = 300 #reports per client per day 
columns = 3 

#config 
concurrent = 10 
records = 60000#6000000 #6 million 
bytesPerRecord = 1024 



mylock = threading.RLock() 
class writeThread(threading.Thread): 
    def __init__(self, threadname, RecordsThreadwillwrite): 
     threading.Thread.__init__(self, name = threadname) 
     bytesPerColumn = int(bytesPerRecord/columns) - 11 #suppose 3 columns 

     self.columnvalue = "value_" + "x"*bytesPerColumn + "_endv" 
     self.tbwBatch = int (RecordsThreadwillwrite/recordsPerBatch) 

     self.transport = TBufferedTransport(TSocket('pnq-adongrevm1', 5151), 40960) 
     self.transport.open() 
     protocol = TBinaryProtocol.TBinaryProtocol(self.transport) 

     self.client = THBaseService.Client(protocol) 
     self.table = "example" 

    def run(self): 
     print "+%s start" % (self.getName()) 
     global gEndT 
     global gWritenItems   

     threadWritenItem = 0 
     for loopidx in xrange(0, self.tbwBatch):    
      self.write_hbase() #write           
      threadWritenItem += recordsPerBatch 

     mylock.acquire() 
     gEndT = time.time() 
     gWritenItems += threadWritenItem 
     print "%s done, %s seconds past, %d reocrds saved" % (self.getName(), gEndT-gStartT, gWritenItems) 
     mylock.release() 
     self.transport.close()     

    def write_hbase(self): #write 50 rowkyes, and 3 column families in each rowkey 
     print self.getName(), "Start write" 
     batchmutations = [] 
     for i in xrange(0, recordsPerBatch): # write to db, 300 items together 
      mutations = [] 
      rowkey = "RK_%s_%s" % (random.random(), time.time())  
      for ii in xrange(0, columns): 
       mutations.append(THBaseService.TPut(row=rowkey, columnValues=[TColumnValue(family="f1", qualifier="%s"%ii, value=self.columnvalue)])) 
     self.client.putMultiple(self.table,mutations)   



itemsPerThread = int(records/concurrent) 
for threadid in xrange(0, concurrent):  
    gStartT = time.time() 
    t = writeThread("Thread_%s" % threadid, itemsPerThread) 
    t.start(); 
print "%d thread created, each thread will write %d records" % (concurrent, itemsPerThread) 

我剛剛得到一個消息10 thread created, each thread will write 6000 records

+0

將'for'和'join()'內的線程移到它外面的線程 – slezica

回答

2

是的,這是因爲你不等待線程完成自己的工作,所以主線程只是退出。試試這個:

itemsPerThread = int(records/concurrent) 
threads = [] 
for threadid in xrange(0, concurrent):  
    gStartT = time.time() 
    t = writeThread("Thread_%s" % threadid, itemsPerThread) 
    t.start(); 
    threads.append(t) 

# wait until all finish the job 
for t in threads: 
    t.join() 

編輯哈,我不認爲我在這裏,因爲你沒有標記您的線程作爲守護進程。它應該工作,即使沒有加入。但是看看這個代碼:

class CustomThread(threading.Thread): 
    def run(self): 
     print "test" 

for x in xrange(0, 10): 
    t = CustomThread() 
    t.start() 

它將始終達到print "test"行不管。所以在你的代碼中,無論如何它總是應該達到print "+%s start" % (self.getName())。你確定它不起作用嗎? :)

如果沒有,那麼只有兩種可能:

  1. 。在你__init__方法阻塞操作和/或異常。但是它不會達到最終的印刷效果;
  2. concurrent變量是0出於某種原因(這與最終印刷不一致)。
+0

是的,嘗試過但不起作用。 – Avinash

+0

+1,這裏的主要是確定程序壽命的主線程。你必須保持活力。 – slezica

+0

@Avinash是這樣嗎?我看不出其他問題是誠實的。 – freakish