簡單地說,在Python3.5和3.6中,出於理智的原因,max_workers
參數不允許爲0
。所以我的解決方案是創建一個更「模擬」的ThreadPoolExecutor版本,以記錄ThreadPool的活動,以防將某些內容添加到隊列中,然後對此進行斷言。我會在這裏分享代碼,以防有人想要將其用於他們的目的。
import threading
from concurrent import futures
class RecordingThreadPool(futures.Executor):
"""A thread pool that records if used."""
def __init__(self, max_workers):
self._tp_executor = futures.ThreadPoolExecutor(max_workers=max_workers)
self._lock = threading.Lock()
self._was_used = False
def submit(self, fn, *args, **kwargs):
with self._lock:
self._was_used = True
self._tp_executor.submit(fn, *args, **kwargs)
def was_used(self):
with self._lock:
return self._was_used