2016-04-07 76 views
3

我有獲取250,000樣本每秒,在存儲緩衝器他們並最終附加到由pandas提供的HDFStore採樣應用限定的線程。一般來說,這很好。但是,我有一個運行並不斷清空數據採集設備的線程(DAQ),它需要定期運行。大約一秒的偏差往往會破壞事情。以下是觀察到的時間的極端情況。 Start表示DAQ開始讀取,Finish表示完成,IO表示HDF寫入(DAQIO都出現在單獨的線程中)。GIL爲IO在C擴展(HDF5)

Start  : 2016-04-07 12:28:22.241303 
IO (1)  : 2016-04-07 12:28:22.241303 
Finish  : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms) 
IO Done (1) : 2016-04-07 12:28:46.573440 (24332.39 ms) 

正如您所見,執行此寫操作需要24秒(典型寫入時間約爲40 ms)。我正在寫入的硬盤沒有負載,所以這個延遲不應該由爭用引起(它在運行時大約有7%的利用率)。我已經禁止在我的HDFStore寫入索引。我的應用程序運行許多其他線程,所有這些線程都打印狀態字符串,因此看起來IO任務阻塞了所有其他線程。我花了相當多的時間來瀏覽代碼,找出事情正在放緩的地方,並且它總是在C擴展提供的方法中,並且這導致我的問題。

  1. Python(我使用3.5)在C擴展中搶佔執行? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock?似乎表明它不會,除非擴展特別收益。
  2. 熊貓的HDF5 C代碼是否對I/O執行任何屈服?如果是這樣,這是否意味着延遲是由於CPU有限的任務?我已禁用索引。
  3. 有關我如何獲得一定程度一致的計時的任何建議?我正在考慮將HDF5代碼移入另一個進程。儘管如此,這隻能在一定程度上有所幫助,因爲無論如何,我無法真正容忍~20秒的寫入,尤其是當它們無法預測時。

這裏有一個例子可以運行看問題:

import pandas as pd 
import numpy as np 
from timeit import default_timer as timer 
import datetime 
import random 
import threading 
import time 

def write_samples(store, samples, overwrite): 
    frame = pd.DataFrame(samples, dtype='float64') 

    if not overwrite: 
     store.append("df", frame, format='table', index=False) 
    else: 
     store.put("df", frame, format='table', index=False) 

def begin_io(): 
    store = pd.HDFStore("D:\\slow\\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0) 

    counter = 0 
    while True: 
     data = np.random.rand(50000, 1) 
     start_time = timer() 
     write_samples(store, data, counter == 0) 
     end_time = timer() 

     print("IO Done  : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter)) 

     counter += 1 

    store.close() 

def dummy_thread(): 
    previous = timer() 
    while True: 
     now = timer() 
     print("Dummy Thread : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000)) 
     previous = now 
     time.sleep(0.01) 


if __name__ == '__main__': 
    threading.Thread(target=dummy_thread).start() 
    begin_io() 

你會得到輸出類似於:

IO Done  : 2016-04-08 10:51:14.100479 (3.63 ms, 470) 
Dummy Thread : 2016-04-08 10:51:14.101484 (12 ms) 
IO Done  : 2016-04-08 10:51:14.104475 (3.01 ms, 471) 
Dummy Thread : 2016-04-08 10:51:14.576640 (475 ms) 
IO Done  : 2016-04-08 10:51:14.576640 (472.00 ms, 472) 
Dummy Thread : 2016-04-08 10:51:14.897756 (321 ms) 
IO Done  : 2016-04-08 10:51:14.898782 (320.79 ms, 473) 
IO Done  : 2016-04-08 10:51:14.901772 (3.29 ms, 474) 
IO Done  : 2016-04-08 10:51:14.905773 (2.84 ms, 475) 
IO Done  : 2016-04-08 10:51:14.908775 (2.96 ms, 476) 
Dummy Thread : 2016-04-08 10:51:14.909777 (11 ms) 
+0

任何代碼?如何縮小這個問題來縮短簡單的腳本,這會編寫一些虛擬數據的預期數量。你會看到,如果它仍然遭受同樣的問題,或運作良好。 –

+0

@JanVlcinsky我已經複製它在一個腳本,只是不斷附加到一個HDFStore。我會簡化它,然後在這裏發佈。 – user3870920

+0

@JanVlinsky添加了一個代碼示例 – user3870920

回答

2

答案是否定的,這些作家不釋放GIL 。請參閱文檔here。我知道你實際上並沒有試圖用多個線程來編寫,但是這應該能讓你知道。當寫入發生時確實存在強鎖以防止多次寫入。 PyTablesh5py都將其作爲HDF5標準的一部分。

你可以看看SWMR,雖然不是熊貓直接支持。 PyTables docs herehere指向解決方案。這些通常涉及到一個單獨的進程,將數據從隊列中提取出來並寫入隊列。

在任何情況下,這通常都是一個更具可擴展性的模式。

+0

是的,將它拉入單獨的進程是我能想到的唯一方法,以避免阻塞其他線程。但是,我仍然不確定,爲什麼每隔一段時間都會有很大的延遲。我猜它以某種方式增長文件,但我不確定具體做什麼。我在創建時嘗試過爲'expectedrows'設置一個大數字,但這沒有幫助。任何想法在哪裏看? – user3870920

+0

''PyTables''塊寫入(相對於'expectedrows''計算),但我認爲實際寫入大小(flush)與實現相關(例如,您不知道)。如果你對時間敏感,那麼可以下注到另一個進程,或者使用像''msgpack''這樣的東西直接轉儲到磁盤(並且是可追加的)。通常實時捕捉就是這樣。稍後進行後期處理。 – Jeff

+0

我認爲HDF5是一個很好的選擇,因爲它是一個實時數據捕獲,但我們讓用戶回滾其歷史記錄,所以我們也需要能夠讀取塊(會話可以長達數小時,因此每秒增加250k /秒相當快)。我猜如果我能找到一種方法讓它更頻繁地刷新並將其移到不同的進程,那麼事情應該沒問題。我認爲GIL沒有發佈IO,這有點令人失望,但是可能有一些邏輯背後的原因這個決定。 – user3870920

1

感謝您提供工作代碼。我已經修改了這些,以獲得一些見解,並隨後使用多處理創建了 修改版本。

修改線程版本

所有的修改都只是爲了獲得更多的信息了,沒有概念上的變化。全部進入一個 文件mthread.py並且是部分註釋。

進口通常是:

import pandas as pd 
import numpy as np 
from timeit import default_timer as timer 
import datetime 
import random 
import threading 
import logging 

write_samples得到了一些記錄:

def write_samples(store, samples, overwrite): 
    wslog = logging.getLogger("write_samples") 
    wslog.info("starting") 
    frame = pd.DataFrame(samples, dtype='float64') 

    if overwrite: 
     store.put("df", frame, format='table', index=False) 
    else: 
     store.append("df", frame, format='table', index=False) 
    wslog.info("finished") 

begin_io得到最大duaration,超過了警告日誌條目時的結果:

def begin_io(maxduration=500): 
    iolog = logging.getLogger("begin_io") 
    iolog.info("starting") 
    try: 
     fname = "data/tab" + str(random.randint(0, 100)) + ".h5" 
     iolog.debug("opening store %s", fname) 
     with pd.HDFStore(fname, mode='w', complevel=0) as store: 
      iolog.debug("store %s open", fname) 

      counter = 0 
      while True: 
       data = np.random.rand(50000, 1) 
       start_time = timer() 
       write_samples(store, data, counter == 0) 
       end_time = timer() 
       duration = (end_time - start_time) * 1000 
       iolog.debug("IO Done  : %s (%.2f ms, %d)", 
          datetime.datetime.now(), 
          duration, 
          counter) 
       if duration > maxduration: 
        iolog.warning("Long duration %s", duration) 
       counter += 1 
    except Exception: 
     iolog.exception("oops") 
    finally: 
     iolog.info("finished") 

dummy_thread了修改以正確停止d也會發出警告,如果時間太長:

def dummy_thread(pill2kill, maxduration=500): 
    dtlog = logging.getLogger("dummy_thread") 
    dtlog.info("starting") 
    try: 
     previous = timer() 
     while not pill2kill.wait(0.01): 
      now = timer() 
      duration = (now - previous) * 1000 
      dtlog.info("Dummy Thread : %s (%d ms)", 
         datetime.datetime.now(), 
         duration) 
      if duration > maxduration: 
       dtlog.warning("Long duration %s", duration) 
      previous = now 
     dtlog.debug("stopped looping.") 
    except Exception: 
     dtlog.exception("oops") 
    finally: 
     dtlog.info("finished") 

最後我們將其稱之爲全部。隨意修改日誌等級,WARNING只顯示過多的時間, INFODEBUG告訴更多。

if __name__ == '__main__': 
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s' 
    logging.basicConfig(format=logformat, 
         level=logging.WARNING) 

    pill2kill = threading.Event() 
    t = threading.Thread(target=dummy_thread, args=(pill2kill, 500)) 
    t.start() 
    try: 
     begin_io(500) 
    finally: 
     pill2kill.set() 
     t.join() 

運行代碼,你描述的,我得到的結果:

2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156 
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177 
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512 
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988 
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122 
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203 
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274 
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516 
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424 
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133 

從很明顯的價值,雖然begin_io是非常繁忙和delayd(數據 過程中可能被寫入到磁盤) ,dummy_thread也延遲了幾乎相同的時間。

版本與多 - 行之有效

我已經修改了代碼,以在多個進程中運行,從那時起,它確實沒有阻止 dummy_thread

2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631 
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161 
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851 
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462 
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381 
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971 
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996 
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959 
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711 
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604 
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939 
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508 
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396 
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483 
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542 
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676 
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744 

多處理的代碼是在這裏:

import pandas as pd 
import numpy as np 
from timeit import default_timer as timer 
import datetime 
import random 
import multiprocessing 
import time 
import logging 


def write_samples(store, samples, overwrite): 
    wslog = logging.getLogger("write_samples") 
    wslog.info("starting") 
    frame = pd.DataFrame(samples, dtype='float64') 

    if overwrite: 
     store.put("df", frame, format='table', index=False) 
    else: 
     store.append("df", frame, format='table', index=False) 
    wslog.info("finished") 


def begin_io(pill2kill, maxduration=500): 
    iolog = logging.getLogger("begin_io") 
    iolog.info("starting") 
    try: 
     fname = "data/tab" + str(random.randint(0, 100)) + ".h5" 
     iolog.debug("opening store %s", fname) 
     with pd.HDFStore(fname, mode='w', complevel=0) as store: 
      iolog.debug("store %s open", fname) 

      counter = 0 
      while not pill2kill.wait(0): 
       data = np.random.rand(50000, 1) 
       start_time = timer() 
       write_samples(store, data, counter == 0) 
       end_time = timer() 
       duration = (end_time - start_time) * 1000 
       iolog.debug("IO Done  : %s (%.2f ms, %d)", 
          datetime.datetime.now(), 
          duration, 
          counter) 
       if duration > maxduration: 
        iolog.warning("Long duration %s", duration) 
       counter += 1 
    except Exception: 
     iolog.exception("oops") 
    finally: 
     iolog.info("finished") 


def dummy_thread(pill2kill, maxduration=500): 
    dtlog = logging.getLogger("dummy_thread") 
    dtlog.info("starting") 
    try: 
     previous = timer() 
     while not pill2kill.wait(0.01): 
      now = timer() 
      duration = (now - previous) * 1000 
      dtlog.info("Dummy Thread : %s (%d ms)", 
         datetime.datetime.now(), 
         duration) 
      if duration > maxduration: 
       dtlog.warning("Long duration %s", duration) 
      previous = now 
     dtlog.debug("stopped looping.") 
    except Exception: 
     dtlog.exception("oops") 
    finally: 
     dtlog.info("finished") 


if __name__ == '__main__': 
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s' 
    logging.basicConfig(format=logformat, 
         level=logging.WARNING) 
    pill2kill = multiprocessing.Event() 
    dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,)) 
    dp.start() 
    try: 
     p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,)) 
     p.start() 
     time.sleep(100) 
    finally: 
     pill2kill.set() 
     dp.join() 
     p.join() 

結論

將數據寫入HDF5文件真正阻止其他線程和多版本是必需的。

如果您期望dummy_thread做一些實實在在的工作(如採集數據存儲),並要 從這裏到HDF5串行發送數據時,你將有某種消息的 - 無論是使用 multiprocessing.QueuePipe或者可能使用ZeroMQ(例如PUSH - PULL插座 對)。藉助ZeroMQ,即使在另一臺計算機上也可以保存數據。

編輯/警告:提供的代碼可能無法保存數據有時,我做了它來衡量性能,並沒有使其防水。在處理期間按Ctrl-C時,有時會出現損壞的文件。我認爲這個問題超出了這個問題的範圍(問題應該通過仔細停止運行過程來解決)。

+0

感謝您添加更多有用的日誌記錄,您確實需要通過我的代碼尋找輸出:)想想我們得出了相同的結論 - 需要使用多處理。 – user3870920

+0

@ user3870920你的問題幫助我更好地理解GIL。感謝那。請注意,我的日誌記錄只能打印超過500毫秒的時間,因此看不到短文件(通常大約10毫秒)。 –