1

長時間潛伏者,第一次海報。請讓我知道如何改善我的帖子!在pygmo中使用隊列進行功能評估

我試圖通過使用安裝Anaconda3優化pygmo 2.5圖書館現有的一些代碼,它們通過做彈道優化(這是POST2如果有人有興趣)可執行包裝異步和參數向量的分佈評價。爲了促進這一點,我使用多處理.SyncManager和multiprocessing.Queues通過網絡傳遞輸入並接收輸出和日誌消息。因此,在這種情況下,pygmo將選擇向量來嘗試,支持代碼會將其傳遞到一個分佈式工作人員將抓取的輸入隊列中,通過可執行文件評估並將結果傳遞回去,最終將返回給任何pygmo.algorithm用於評估

我的問題是,當pygmo初始化一個問題時,它會提供所提供的類的深層副本,在我的情況和下面提供的示例代碼中,它包含多個隊列。在執行深度拷貝時,我得到錯誤

File "pygmo_testing.py", line 121, in <module> 
    main() 
    File "pygmo_testing.py", line 108, in main 
    prob = pg.problem(my_prob) 
    File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy 
    y = _reconstruct(x, memo, *rv) 
    File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct 
    state = deepcopy(state, memo) 
    File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy 
    y = copier(x, memo) 
    File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict 
    y[deepcopy(key, memo)] = deepcopy(value, memo) 
    File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy 
    rv = reductor(4) 
    File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__ 
    context.assert_spawning(self) 
    File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning 
    ' through inheritance' % type(obj).__name__ 
RuntimeError: Queue objects should only be shared between processes through inheritance 

有沒有辦法解決這個問題?我需要保持執行風格異步,並分發給代碼使用的其他方法。我也嘗試過queue.Queue和multiprocessing.Manager.Queue(兩者都不能與其他現有的代碼一起工作)以獲得完整性,但它總是歸結爲深層次的複製。

謝謝大家!


""" 
****************************** Import Statements ****************************** 
""" 
import pygmo as pg 
from multiprocessing import Pool, Queue 

""" 
****************************** Utility Functions ****************************** 
""" 
def sphere_fitness(x): 
    return sum(x*x) 

def worker(inp_q, out_q): 

    while True: 

     x = inp_q.get() 
     print("got {}".format(x)) 
     if x == False: 
      break 
     else: 
      fit = sphere_fitness(x) 
      print("x: {} f: {}".format(x, fit)) 
      out_q.put_nowait(fit) 
      print("submitted {}".format(x))   
""" 
********************************** Class(es) ************************************ 
""" 
class distributed_submit(object): 
    """ Class for pygmo Problem""" 

    def __init__(self, dim, inp_q, out_q): 
     self.dim = dim 
     self._inp_q = inp_q 
     self._out_q = out_q 

    def _submit(self, inp_q, x): 
     self._inp_q.put_nowait(x) 
     print("x delivered") 

    def _receive(self, out_q): 
     return self._out_q.get() 

    def fitness(self, x): 
     self._submit(x) 
     print("put in {}".format(x)) 
     fit = self._receive() 
     print("got {}".format(fit)) 
     return [fit] 

    def get_bounds(self): 
     return ([-1]*self.dim, [1]*self.dim) 

    def get_name(self): 
     return "Sphere Function" 

    def get_extra_info(self): 
     return "\tDimensions: {}".format(self.dim) 


""" 
******************************* Main Function ******************************** 
""" 
def main(): 

    # Queues from multiprocessing 
    _inp_q = Queue() 
    _out_q = Queue() 

    _workers = Pool(initializer=worker, 
        initargs=(_inp_q, _out_q)) 
    _workers.close() 

    my_prob = distributed_submit(3, _inp_q, _out_q) 

    prob = pg.problem(my_prob) 

    algo = pg.algorithm(pg.bee_colony(gen=20, limit=20)) 

    pop = pg.population(prob, 10) 
    print(pop) 

    pop = algo.evolve(pop) 

    print(pop.champion_f) 


if __name__ == "__main__": 
    main() 
+0

我在讀的所有東西都說Queue是跨線程或進程(分別爲collections.Queue和multiprocessing.Queue)的同步,並且它們不能被複制。像發電機一樣,複製它將會耗盡它。如果pygmo必須複製它給出的內容,那麼您需要找到一種方法來處理我認爲可複製的數據結構。我將不得不再看看它... – flutefreak7

回答