2009-11-13 78 views
0

我雖然會很有趣,看看線程和隊列,所以我已經寫了2個腳本,一個將打破文件並加密在一個線程每個塊,其他的會做連續。我對python還很陌生,並且不知道爲什麼treading腳本需要這麼長時間。線程和隊列VS系列性能

螺紋腳本:

#!/usr/bin/env python 

from Crypto.Cipher import AES 
from optparse import OptionParser 
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue 


BLOCK_SIZE = 32 #32 = 256-bit | 16 = 128-bit 
TFILE = 'mytestfile.bin' 
CHUNK_SIZE = 2048 * 2048 
KEY = os.urandom(32) 

class DataSplit(): 
    def __init__(self,fileObj, chunkSize): 

     self.fileObj = fileObj 
     self.chunkSize = chunkSize 

    def split(self): 
     while True: 
      data = self.fileObj.read(self.chunkSize) 
      if not data: 
       break 
      yield data 

class encThread(threading.Thread): 
    def __init__(self, seg_queue,result_queue, cipher): 
     threading.Thread.__init__(self) 
     self.seg_queue = seg_queue 
     self.result_queue = result_queue 
     self.cipher = cipher 

    def run(self): 
     while True: 
      #Grab a data segment from the queue 
      data = self.seg_queue.get() 
      encSegment = []   
      for lines in data: 
      encSegment.append(self.cipher.encrypt(lines)) 
      self.result_queue.put(encSegment) 
      print "Segment Encrypted" 
      self.seg_queue.task_done() 

start = time.time() 
def main(): 
    seg_queue = Queue.Queue() 
    result_queue = Queue.Queue() 
    estSegCount = (os.path.getsize(TFILE)/CHUNK_SIZE)+1 
    cipher = AES.new(KEY, AES.MODE_CFB) 
    #Spawn threads (one for each segment at the moment) 
    for i in range(estSegCount): 
     eT = encThread(seg_queue, result_queue, cipher) 
     eT.setDaemon(True) 
     eT.start() 
     print ("thread spawned") 

    fileObj = open(TFILE, "rb") 
    splitter = DataSplit(fileObj, CHUNK_SIZE) 
    for data in splitter.split(): 
     seg_queue.put(data) 
     print ("Data sent to thread") 

    seg_queue.join() 
    #result_queue.join() 
    print ("Seg Q: {0}".format(seg_queue.qsize())) 
    print ("Res Q: {0}".format(result_queue.qsize())) 



main() 
print ("Elapsed Time: {0}".format(time.time()-start)) 

串行腳本:使用readlines方法(),而不是讀

#!/usr/bin/env python 

from Crypto.Cipher import AES 
from optparse import OptionParser 
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue 

TFILE = 'mytestfile.bin' 
CHUNK_SIZE = 2048 * 2048 

class EncSeries(): 
    def __init(self): 
     pass 

    def loadFile(self,path): 
     openFile = open(path, "rb") 
     #fileData = openFile.readlines() 
     fileData = openFile.read(CHUNK_SIZE) 
     openFile.close() 
     return fileData 

    def encryptData(self,key, data): 
     cipher = AES.new(key, AES.MODE_CFB) 
     newData = [] 
     for lines in data: 
      newData.append(cipher.encrypt(lines)) 
     return newData 


start = time.time() 
def main(): 
    print ("Start") 
    key = os.urandom(32) 
    run = EncSeries() 
    fileData = run.loadFile(TFILE) 

    encFileData=run.encryptData(key, fileData) 
    print("Finish") 

main() 
print ("Elapsed Time: {0}".format(time.time()-start)) 

似乎對串行版本大大加快東西太多,但它已經是多快比線程版本。

回答

1
  1. 看起來你的第二個版本只能讀取一個塊,而第一個版本讀取整個文件 - 這將解釋大加速。 編輯:另一個問題:我只是注意到你無緣無故地運行了for lines in data--這實際上是對單個字符進行加密,這會慢得多。相反,只需將數據直接傳遞給encrypt即可。

  2. 有正在啓動更多的CPU重線程比你的處理器內核是沒有意義的。

  3. 的線程可以並行,如果它們調用的解鎖GIL同時運行的擴展模塊才能正常工作。我不認爲PyCrypto這樣做,所以你不會在這裏完成任何並行的工作。

  4. 如果瓶頸是磁盤性能,那麼無論如何你都不會看到很大的改進 - 在這種情況下,最好有一個線程做磁盤I/O,另一個線程做加密。 GIL不會成爲問題,因爲它在做磁盤I/O時被釋放。

+0

很多thx的信息傢伙。我將修改線程版本爲一個I/O線程和一個加密線程。對於小文件,我是否可以通過創建兩個具有唯一密碼對象的進程並行加密來獲得性能提升? – zyrus001 2009-11-13 12:06:04

+0

如果您看到100%的CPU核心,那麼它可以幫助添加另一個進程。如果沒有,那麼磁盤I/O可能是瓶頸,而另一個進程無濟於事。 – interjay 2009-11-13 12:36:36

1

線程不加快程序的神奇的方式 - 拆分工作到線程通常會慢下來,除非該程序會耗費其時間等待I/O一顯著部分。每個新線程都會在代碼分割工作中增加更多的開銷,並且在線程間切換時操作系統開銷更大。

從理論上講,如果你是一個多處理器CPU則線程可能在不同的處理器上運行上運行,以便工作並行進行,但即使再有就是具有比處理器的多個線程是沒有意義的。

實際上它是完全不同的,至少對於C版本的Python。 GIL在多處理器上運行得並不好。 David Beazley看到這個presentation的原因是爲什麼。 IronPython和Jython沒有這個問題。

如果你真的想要並行化工作,那麼最好是產生多個進程並把它們分配給它們,但是有可能傳輸大塊數據的進程間通信開銷會否定並行性的好處。

0

主題有幾個不同的用途:

  1. 他們只提供加速,如果他們讓你得到多個硬件在同一時間對你的問題的工作,即硬件是否是CPU內核或磁盤頭。

  2. 它們允許你跟蹤I/O事件的多個序列,這將是複雜得多,沒有他們,比如與多個用戶同時通話的。

後者不是爲了性能,而是爲了清晰的代碼。

1

我看了戴夫柯比鏈接到演示文稿,並試圖例子計數器,它需要兩個多兩倍的時間在兩個線程運行:

import time 
from threading import Thread 

countmax=100000000 

def count(n): 
    while n>0: 
     n-=1 

def main1(): 
    count(countmax) 
    count(countmax) 

def main2(): 
    t1=Thread(target=count,args=(countmax,)) 
    t2=Thread(target=count,args=(countmax,)) 
    t1.start() 
    t2.start() 
    t1.join() 
    t2.join() 

def timeit(func): 
    start = time.time() 
    func() 
    end=time.time()-start 
    print ("Elapsed Time: {0}".format(end)) 

if __name__ == '__main__': 
    timeit(main1) 
    timeit(main2) 

輸出:

Elapsed Time: 21.5470001698 
Elapsed Time: 55.3279998302 

但是,如果我換了Thread for Process:

from multiprocessing import Process 

and

t1=Process(target .... 

等我得到這樣的輸出:

Elapsed Time: 20.5 
Elapsed Time: 10.4059998989 

現在它好像我的奔騰CPU有兩個內核,我敢打賭,它的超線程。任何人都可以在他們的兩個或四個核心機器上嘗試這種方式並運行2或4個線程?

multiprocessing

0

只是一個快速注蟒蛇2.6.4文檔更新這個帖子:蟒蛇3.2有一個新的實現這減輕了不少與多線程相關的開銷的GIL的,但並沒有消除鎖定。 (即它不允許你使用多個核心,但它允許你有效地在該核心上使用多個線程)。