2015-04-02 12 views
0

我的__init__函數嘗試調用調度程序類verify_func()的函數。多處理池apply_async函數應用於Object拋出不能pickle <type'thread.lock'>對象

def__init__: 
def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
    return func.__get__(obj, cls) 


if __name__=='__main__': 
    while True: 
    #Create a scheduler object 
    scheduler_obj=Scheduler() 
    try: 
     #Connect to the database get all the new requests to be verified 
     db = Database(scheduler_obj.username_testschema, scheduler_obj.password_testschema, scheduler_obj.mother_host_testschema, 
        scheduler_obj.mother_port_testschema, scheduler_obj.mother_sid_testschema, 0) 
     result = db.query("SELECT JOB_ID FROM %s.table_1 WHERE JOB_STATUS='%s'" % (scheduler_obj.username_testschema, 'initiated')) 
     copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) 
     pool = mp.Pool(len(result)) 
     for row in result: 
     verify_id = row[0] 
     print mp.current_process() 
     try: 
      pool.apply_async(scheduler_obj.verify_func, (verify_id)) 
     except Exception as e: 
        scheduler_obj.logger.exception("MP exception : %s", e) 

     pool.close() 
     pool.join() 

    except Exception as e: 
     scheduler_obj.logger.exception(e) 
    finally: 
     time.sleep(10) 

verify_func嘗試使用cx_oracle在數據庫上運行一些查詢。

def verify_func(self,verify_id): 
    print "Verifying with Current Id : ", verify_id 
    try: 
     db = Database(self.username_testschema, self.password_testschema, self.mother_host_testschema, 
        self.mother_port_testschema, self.mother_sid_testschema, 0) 
     result = db.query("select * from %s.table_2 where job_id=%d" % (self.username_testschema,verify_id)) 
     column_name = [d[0] for d in db.cursor.description] 
     final_dictionary = [dict(zip(column_name, row)) for row in result] 

     #On getting the new job_ids whose status is .. try to run checks for each one. 
     for row in final_dictionary: 
     print row 

所以它是連接並運行在數據庫上的查詢,由於它並沒有說明它究竟在哪裏破壞,所以引發這個錯誤。

class Database(object): 
def __init__: 
    self.connection = cx_Oracle.connect(self.connect_string, mode=self.mode) 

讓我知道是否需要更多解釋。

+0

「計劃程序」實例內有一個包含「threading.Lock」的對象,該對象不能被醃製。您可能需要將'__getstate__' /'__setstate__'方法添加到'Scheduler'類中,以便在酸洗之前從實例中刪除違規對象。 – dano 2015-04-02 19:43:05

+0

好的,但是哪個對象,沒有辦法跟蹤那個?讓我嘗試一下。謝謝@dano。 – TommyT 2015-04-02 20:32:38

+0

我通常最終會通過源代碼挖掘出來。你也許可以在'pickle'模塊中添加一些跟蹤來幫助確定哪個對象,但是我不能告訴你哪個函數將其添加到offhand。 – dano 2015-04-02 20:39:10

回答

0

雅我做了scheduler_object。 字典在那裏我看到我有一個記錄器實例變量,這是不可pickleable。謝謝@Dano。

所以調試的最好的辦法是找出可以是類變量/方法的堂妹,如果您收到此錯誤。 此外,您還可以覆蓋記錄class` ,有getstatesetstate這方法,使其與pickle如下所示。 Can't pickle loggers?

+1

嘗試將'import dill'添加到代碼的頂部。 'dill'知道如何醃製實例方法和鎖以及各種其他的東西,而無需使用'_unpickle_method'業務。如果這不起作用,你正在使用'multiprocessing'的b/c,你可以使用'pathos.multiprocessing',而不是'multiprocessing'的分支,用'pickle'和'dill'替代'pickle'。 – 2015-04-02 21:58:55

+0

我會試一試那種先生。 – TommyT 2015-04-02 22:13:32

+0

https://github.com/uqfoundation/pathos – 2015-04-02 22:19:03

相關問題