2015-10-13 80 views
3

我是新來的Python中的多線程,我目前正在編寫附加到csv文件的腳本。如果我有多個線程提交到一個concurrent.futures.ThreadPoolExecutor,將行添加到csv文件。如果追加是這些線程唯一的文件相關操作,我該怎麼做才能保證線程安全?多個線程在Python中寫入相同的CSV

簡化我的代碼版本:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    for count,ad_id in enumerate(advertisers): 

     downloadFutures.append(executor.submit(downloadThread, arguments.....)) 
     time.sleep(random.randint(1,3)) 

而我的線程類的存在:

def downloadThread(arguments......): 

       #Some code..... 

       writer.writerow(re.split(',', line.decode())) 

我應該設立一個單獨的單線程執行來處理文字還是woth擔心如果我只是追加?

編輯:我要闡述的是,當寫操作發生時可以當文件被下一個追加到分鐘之間有很大不同,我只是擔心,沒有發生這種情況下測試我的劇本的時候,我寧願被覆蓋爲了那個原因。

+0

你也許可以做一個線程'csvwriter'使用的一個技巧在[這個答案](http://stackoverflow.com/a/13618333/355230)中提到的相關問題。 – martineau

回答

8

我不確定csvwriter是否線程安全。該documentation沒有明確規定,因此是安全的,如果多個線程使用相同的對象,您應該保護與使用量的threading.Lock

# create the lock 
import threading 
csv_writer_lock = threading.Lock() 

def downloadThread(arguments......): 
    # pass csv_writer_lock somehow 
    # Note: use csv_writer_lock on *any* access 
    # Some code..... 
    with csv_writer_lock: 
     writer.writerow(re.split(',', line.decode())) 

話雖這麼說,它可能確實是爲downloadThread更優雅將寫入任務提交給執行程序,而不是顯式使用像這樣的鎖。

+0

我會使用一個鎖來訪問共享'writer'(或者爲它自動創建一個包裝類/對象)。 – martineau

+0

@martineau:好點!我已經更新了我的答案以反映這一點。 – Claudiu

+0

可能是我對我的問題最直接的回答,非常感謝。 – GreenGodot

1

這裏是一些代碼,它也處理頭痛引起的unicode的問題:

def ensure_bytes(s): 
    return s.encode('utf-8') if isinstance(s, unicode) else s 

class ThreadSafeWriter(object): 
''' 
>>> from StringIO import StringIO 
>>> f = StringIO() 
>>> wtr = ThreadSafeWriter(f) 
>>> wtr.writerow(['a', 'b']) 
>>> f.getvalue() == "a,b\\r\\n" 
True 
''' 

def __init__(self, *args, **kwargs): 
    self._writer = csv.writer(*args, **kwargs) 
    self._lock = threading.Lock() 

def _encode(self, row): 
    return [ensure_bytes(cell) for cell in row] 

def writerow(self, row): 
    row = self._encode(row) 
    with self._lock: 
     return self._writer.writerow(row) 

def writerows(self, rows): 
    rows = (self._encode(row) for row in rows) 
    with self._lock: 
     return self._writer.writerows(rows) 

# example: 
with open('some.csv', 'w') as f: 
    writer = ThreadSafeWriter(f) 
    writer.write([u'中文', 'bar']) 

更詳細的解決方案是here

相關問題