2016-12-02 84 views
0

一旦列表的長度發生一定的變化,我可以觸發某些函數的有效方法是什麼?如何在滿足條件時觸發函數

我有一個嵌套列表,我每秒添加數據100次,並且我想在列表長度增加一些值時觸發一個函數。我試圖在while循環內使用if聲明來執行此操作(請參見下面的my_loop())。這很有效,但這個看似簡單的操作佔用了我的一個CPU核心的100%。在我看來,不斷查詢列表大小是腳本的限制因素(將數據添加到while循環中的列表不是資源密集型的)。

這裏是我到目前爲止已經試過:

from threading import Event, Thread 
import time 

def add_indefinitely(list_, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     list_.append([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    buffer_len *= 100 
    b0 = len(list_) 
    while not kill_signal.is_set(): 
     if len(list_) - b0 > buffer_len: 
      b0 = len(list_) 
      print("Len of list_ is {}".format(b0)) 


list_ = [] 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

輸出示例:

Len of list_ is 202 
Len of list_ is 403 
Len of list_ is 604 
Len of list_ is 805 
Len of list_ is 1006 
+1

Python的多線程不切換活動線程直到I/O需要地方或'time.sleep()'被稱爲當前正在運行的那個。這意味着你的代碼大部分時間都在執行一個線程或另一個線程。 – martineau

+0

謝謝。即使當我使用1 ms的睡眠持續時間('time.sleep(0.001)')時,這也是有效的。 – Jakub

回答

1

這是not very safe從兩個線程訪問列表,所以我會建議線程間通信更安全的方式。在CPython中,您的代碼不會損壞列表的內容,但每次處理批次時可能不會得到完全200個項目。如果您開始從my_loop()的列表中刪除項目,則可能會遇到麻煩。如果您使用其他版本的Python而不使用GIL,則可能會遇到更多麻煩。

在此之前,儘管如此,這是我所能想到的解決所問問題的最小更改:CPU使用率。我只是增加了一個睡眠my_loop()和清理緩存的計算,因此現在報告一個大多持穩201,401,601。偶爾,我看到了1002

from threading import Event, Thread 
import time 

def add_indefinitely(list_, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     list_.append([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    buffer_len *= 100 
    b0 = len(list_) 
    while not kill_signal.is_set(): 
     time.sleep(0.01) 
     if len(list_) - b0 >= buffer_len: 
      b0 += buffer_len 
      print("Len of list_ is {}".format(len(list_))) 


list_ = [] 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

time.sleep(30) 
stop_all() 

現在,安全地做到這一點,我建議你使用queue。這將允許許多線程讀取或寫入隊列,並且它將處理通信。如果一個線程試圖從一個空隊列中讀取數據,它就會阻塞,直到某個其他線程將一個項目添加到隊列中。

我不確定你想要做什麼,所以我只是把它們放在一個列表中,留在那裏。但是,現在列表只能由一個線程訪問,在處理每批100個項目後清除它是安全的。

因爲my_loop()現在被阻塞,所以當您設置kill信號時,它不一定會注意到。相反,我在請求隊列中使用了None的Sentry值來告訴它關閉。如果這不起作用,則可以在從隊列中獲取項目時使用超時,檢查終止信號,然後嘗試再次獲取項目。

from threading import Event, Thread 
from queue import Queue 
import time 

def add_indefinitely(request_queue, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     request_queue.put([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 
    request_queue.put(None) # Signal to shut down 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    received_items = [] # replaces list_ 
    buffer_len *= 100 
    while True: 
     item = request_queue.get() 
     if item is None: 
      break 
     received_items.append(item) 
     if len(received_items) % buffer_len == 0: 
      print("Len of received_items is {}".format(len(received_items))) 


request_queue = Queue() 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(request_queue, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

time.sleep(30) 
stop_all() 
0

我不知道如果我完全得到你想要做什麼,但這裏是我的想法:

add_indefinitely()在MyThread類中實現,該類使用Manager類的實例作爲參數parent。當遇到increment時,MyThread實例調用parenton_increment()方法做東西。

from threading import Event, Thread 
import time 


class Manager(object): 

    def on_increment(self, thread): 
     print thread.get_data_len() 


class MyThread(Thread): 

    def __init__(self, parent, increment, kill_signal): 
     super(MyThread, self).__init__() 
     self.parent = parent 
     self.data = [] 
     self.increment = increment 
     self.kill_signal = kill_signal 

    def run(self): 
     while not self.kill_signal.is_set(): 
      self.data.append([1] * 32) 
      time.sleep(0.01) 
      if len(self.data) % self.increment == 0: 
       self.parent.on_increment(self) 

    def get_data_len(self): 
     return len(self.data) 


kill_signal = Event() 
manager = Manager() 
thread = MyThread(manager, 200, kill_signal) 
thread.deamon = True 


try: 
    thread.start() 
    while True: 
     time.sleep(1) 
except (KeyboardInterrupt, SystemExit): 
    kill_signal.set() 
    thread.join() 
0

這將是一個很多更有效有一個單獨的線程循環和檢查列表的長度,而只是有一個阻塞,等待事件發生。下面是說明如何做這樣的事情額外增加threading.Lock對象來控制全局變量或資源的併發訪問(如打印到stdout)爲例:

from threading import Event, Lock, Thread 
import time 

THRESHOLD = 100 # minimum number of items in list_ before event triggered 
list_ = [] 
list_lock = Lock() # to control access to global list_ 
length_signal = Event() 
print_lock = Lock() # to control concurrent printing 

def add_indefinitely(): 
    while True: 
     with list_lock: 
      list_.append([1] * 32) 
      if len(list_) >= THRESHOLD: 
       with print_lock: 
        print('setting length_signal') 
       length_signal.set() 
     time.sleep(.01) # give other threads a change to run 

def length_reached(): 
    """ 
    Waits until list_ has reached a certain length, and then print message 
    """ 
    with print_lock: 
     print('waiting for list_ to reach threshold length') 
    length_signal.wait() # blocks until event is set 
    with print_lock: 
     with list_lock: 
      print('list_ now contains at least {} items'.format(len(list_))) 

# first start thread that will wait for the length to be reached 
length_reached_thread = Thread(target=length_reached) 
length_reached_thread.start() 

data_thread = Thread(target=add_indefinitely) 
data_thread.daemon = True 
data_thread.start() 

length_reached_thread.join() 
with print_lock: 
    print('finished')