感謝您提供工作代碼。我已經修改了這些,以獲得一些見解,並隨後使用多處理創建了 修改版本。
修改線程版本
所有的修改都只是爲了獲得更多的信息了,沒有概念上的變化。全部進入一個 文件mthread.py
並且是部分註釋。
進口通常是:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging
write_samples
得到了一些記錄:
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
begin_io
得到最大duaration,超過了警告日誌條目時的結果:
def begin_io(maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while True:
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug("IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
dummy_thread
了修改以正確停止d也會發出警告,如果時間太長:
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info("Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
最後我們將其稱之爲全部。隨意修改日誌等級,WARNING
只顯示過多的時間, INFO
和DEBUG
告訴更多。
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = threading.Event()
t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
t.start()
try:
begin_io(500)
finally:
pill2kill.set()
t.join()
運行代碼,你描述的,我得到的結果:
2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133
從很明顯的價值,雖然begin_io
是非常繁忙和delayd(數據 過程中可能被寫入到磁盤) ,dummy_thread
也延遲了幾乎相同的時間。
版本與多 - 行之有效
我已經修改了代碼,以在多個進程中運行,從那時起,它確實沒有阻止 dummy_thread
。
2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744
多處理的代碼是在這裏:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
def begin_io(pill2kill, maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while not pill2kill.wait(0):
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug("IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info("Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = multiprocessing.Event()
dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
dp.start()
try:
p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
p.start()
time.sleep(100)
finally:
pill2kill.set()
dp.join()
p.join()
結論
將數據寫入HDF5文件真正阻止其他線程和多版本是必需的。
如果您期望dummy_thread
做一些實實在在的工作(如採集數據存儲),並要 從這裏到HDF5串行發送數據時,你將有某種消息的 - 無論是使用 multiprocessing.Queue
,Pipe
或者可能使用ZeroMQ(例如PUSH - PULL插座 對)。藉助ZeroMQ,即使在另一臺計算機上也可以保存數據。
編輯/警告:提供的代碼可能無法保存數據有時,我做了它來衡量性能,並沒有使其防水。在處理期間按Ctrl-C時,有時會出現損壞的文件。我認爲這個問題超出了這個問題的範圍(問題應該通過仔細停止運行過程來解決)。
任何代碼?如何縮小這個問題來縮短簡單的腳本,這會編寫一些虛擬數據的預期數量。你會看到,如果它仍然遭受同樣的問題,或運作良好。 –
@JanVlcinsky我已經複製它在一個腳本,只是不斷附加到一個HDFStore。我會簡化它,然後在這裏發佈。 – user3870920
@JanVlinsky添加了一個代碼示例 – user3870920