長時間潛伏者,第一次海報。請讓我知道如何改善我的帖子!在pygmo中使用隊列進行功能評估
我試圖通過使用安裝Anaconda3優化pygmo 2.5圖書館現有的一些代碼,它們通過做彈道優化(這是POST2如果有人有興趣)可執行包裝異步和參數向量的分佈評價。爲了促進這一點,我使用多處理.SyncManager和multiprocessing.Queues通過網絡傳遞輸入並接收輸出和日誌消息。因此,在這種情況下,pygmo將選擇向量來嘗試,支持代碼會將其傳遞到一個分佈式工作人員將抓取的輸入隊列中,通過可執行文件評估並將結果傳遞回去,最終將返回給任何pygmo.algorithm用於評估
我的問題是,當pygmo初始化一個問題時,它會提供所提供的類的深層副本,在我的情況和下面提供的示例代碼中,它包含多個隊列。在執行深度拷貝時,我得到錯誤
File "pygmo_testing.py", line 121, in <module>
main()
File "pygmo_testing.py", line 108, in main
prob = pg.problem(my_prob)
File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
y = copier(x, memo)
File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
rv = reductor(4)
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
有沒有辦法解決這個問題?我需要保持執行風格異步,並分發給代碼使用的其他方法。我也嘗試過queue.Queue和multiprocessing.Manager.Queue(兩者都不能與其他現有的代碼一起工作)以獲得完整性,但它總是歸結爲深層次的複製。
謝謝大家!
"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue
"""
****************************** Utility Functions ******************************
"""
def sphere_fitness(x):
return sum(x*x)
def worker(inp_q, out_q):
while True:
x = inp_q.get()
print("got {}".format(x))
if x == False:
break
else:
fit = sphere_fitness(x)
print("x: {} f: {}".format(x, fit))
out_q.put_nowait(fit)
print("submitted {}".format(x))
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
""" Class for pygmo Problem"""
def __init__(self, dim, inp_q, out_q):
self.dim = dim
self._inp_q = inp_q
self._out_q = out_q
def _submit(self, inp_q, x):
self._inp_q.put_nowait(x)
print("x delivered")
def _receive(self, out_q):
return self._out_q.get()
def fitness(self, x):
self._submit(x)
print("put in {}".format(x))
fit = self._receive()
print("got {}".format(fit))
return [fit]
def get_bounds(self):
return ([-1]*self.dim, [1]*self.dim)
def get_name(self):
return "Sphere Function"
def get_extra_info(self):
return "\tDimensions: {}".format(self.dim)
"""
******************************* Main Function ********************************
"""
def main():
# Queues from multiprocessing
_inp_q = Queue()
_out_q = Queue()
_workers = Pool(initializer=worker,
initargs=(_inp_q, _out_q))
_workers.close()
my_prob = distributed_submit(3, _inp_q, _out_q)
prob = pg.problem(my_prob)
algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))
pop = pg.population(prob, 10)
print(pop)
pop = algo.evolve(pop)
print(pop.champion_f)
if __name__ == "__main__":
main()
我在讀的所有東西都說Queue是跨線程或進程(分別爲collections.Queue和multiprocessing.Queue)的同步,並且它們不能被複制。像發電機一樣,複製它將會耗盡它。如果pygmo必須複製它給出的內容,那麼您需要找到一種方法來處理我認爲可複製的數據結構。我將不得不再看看它... – flutefreak7