2017-08-26 59 views
0

我想線程有一些局部變量,與thread.Thread它可以像這樣優雅來完成:如何在ThreadPoolExecutor中使用threadlocal變量?

class TTT(threading.Thread): 
    def __init__(self, lines, ip, port): 
     threading.Thread.__init__(self) 
     self._lines = lines; 
     self._sock = initsock(ip, port) 
     self._sts = 0 
     self._cts = 0 

    def run(self): 
     for line in self._lines: 
      query = genquery(line) 
      length = len(query) 
      head = "0xFFFFFFFE" 
      q = struct.pack('II%ds'%len(query), head, length, query) 
      sock.send(q) 
      sock.recv(4) 
      length, = struct.unpack('I', sock.recv(4)) 
      result = '' 
      remain = length 
      while remain: 
       t = sock.recv(remain) 
       result+=t 
       remain-=len(t) 
      print(result) 

正如你可以看到,_lines_sock_sts_cts這些變量將獨立於每個線程。

但與concurrent.future.ThreadPoolExecutor,它似乎並不那麼容易。隨着ThreadPoolExecutor,我怎樣才能把事情優雅?(沒有更多的全局變量)


新編

class Processor(object): 
    def __init__(self, host, port): 
     self._sock = self._init_sock(host, port) 

    def __call__(self, address, adcode): 
     self._send_data(address, adcode) 
     result = self._recv_data() 
     return json.loads(result) 

def main(): 
    args = parse_args() 
    adcode = {"shenzhen": 440300}[args.city] 

    if args.output: 
     fo = open(args.output, "w", encoding="utf-8") 
    else: 
     fo = sys.stdout 
    with open(args.file, encoding=args.encoding) as fi, fo,\ 
     ThreadPoolExecutor(max_workers=args.processes) as executor: 
     reader = csv.DictReader(fi) 
     writer = csv.DictWriter(fo, reader.fieldnames + ["crfterm"]) 
     test_set = AddressIter(args.file, args.field, args.encoding) 
     func = Processor(args.host, args.port) 
     futures = map(lambda x: executor.submit(func, x, adcode), test_set) 
     for row, future in zip(reader, as_completed(futures)): 
      result = future.result() 
      row["crfterm"] = join_segs_tags(result["segs"], result["tags"]) 
      writer.writerow(row) 
+1

你的函數實際上可以是可調用的對象。 –

+0

顯示你現在正在使用線程池,我會告訴你如何解決它。 –

+0

@MadPhysicist等一下 – roger

回答

1

使用非常相似,你現在有什麼是最簡單的事情的佈局。取而代之的是Thread的,有一個正常的對象,而不是run,實現邏輯在__call__

class TTT: 
    def __init__(self, lines, ip, port): 
     self._lines = lines; 
     self._sock = initsock(ip, port) 
     self._sts = 0 
     self._cts = 0 

    def __call__(self): 
     ... 
     # do stuff to self 

添加__call__方法一類能夠調用的實例就好像它們是普通的功能。實際上,正常的功能是具有這種方法的對象。您現在可以將一堆TTT實例傳遞給mapsubmit

或者,可以吸收進入初始化你的任務功能:

def ttt(lines, ip, port): 
    sock = initsock(ip, port) 
    sts = cts = 0 
    ... 

現在,您可以撥打submit用正確的參數列表或map與值的每個參數的迭代。

我更喜歡這個例子的前一種方法,因爲它會在執行程序之外打開端口。執行程序任務中的錯誤報告有時可能會非常棘手,而且我傾向於將容易出錯的操作打開爲儘可能透明的端口。

編輯

根據您相關的問題,我相信,你問真正的問題是關於函數的局部變量(這是自動線程本地以及),不被共享之間的函數調用同一個線程。但是,您始終可以在函數調用之間傳遞引用。

+0

我用'__call__'和'submit'添加我的新代碼,但是如何使用這個類? – roger

+0

在我的代碼中,'Processor'只是一次init,如果我執行'executor.submit(Processor(args.host,args.port),x,adcode)',它會每次啓動。 – roger

+0

@roger#1這實際上意味着你的所有線程都共享狀態,而你並沒有問你以爲你是什麼。 #2,你總是可以做'proc = Processor(...);遺囑執行人。提交(proc,...)' –

相關問題