0

我是python中的多處理新手。我從70,000個URL中提取一些功能。我有他們從2個不同的文件。在特徵提取過程之後,我將結果傳遞給一個列表,然後傳遞給一個CSV文件。BrokenPipeError:[WinError 109]管道在數據提取過程中結束

該代碼運行,但隨後停止與錯誤。我試圖捕捉錯誤,但它產生了另一個。

Python版本= 3.5

from feature_extractor import Feature_extraction 

import pandas as pd 

from pandas.core.frame import DataFrame 

import sys 

from multiprocessing.dummy import Pool as ThreadPool 

import threading as thread 

from multiprocessing import Process,Manager,Array 

import time 

class main(): 

lst = None 

def __init__(self): 
    manager = Manager() 
    self.lst = manager.list() 
    self.dostuff() 
    self.read_lst() 

def feature_extraction(self,url): 
     if self.lst is None: 
      self.lst = [] 

     features = Feature_extraction(url) 
     self.lst.append(features.get_features()) 
     print(len(self.lst)) 



def Pool(self,url): 
     pool = ThreadPool(8) 
     results = pool.map(self.feature_extraction, url) 

def dostuff(self): 
    df = pd.read_csv('verified_online.csv',encoding='latin-1') 
    df['label'] = df['phish_id'] * 0 
    mal_urls = df['url'] 

    df2 = pd.read_csv('new.csv') 
    df2['label'] = df['phish_id']/df['phish_id'] 
    ben_urls = df2['urls'] 
    t = Process(target=self.Pool,args=(mal_urls,)) 
    t2 = Process(target=self.Pool,args=(ben_urls,)) 
    t.start() 
    t2.start() 
    t.join() 
    t2.join 

def read_lst(self): 
    nw_df = DataFrame(list(self.lst)) 

    nw_df.columns = ['Redirect count','ssl_classification','url_length','hostname_length','subdomain_count','at_sign_in_url','exe_extension_in_request_url','exe_extension_in_landing_url', 
         'ip_as_domain_name','no_of_slashes_in requst_url','no_of_slashes_in_landing_url','no_of_dots_in_request_url','no_of_dots_in_landing_url','tld_value','age_of_domain', 
         'age_of_last_modified','content_length','same_landing_and_request_ip','same_landing_and_request_url'] 
    frames = [df['label'],df2['label']] 
    new_df = pd.concat(frames) 
    new_df = new_df.reset_index() 
    nw_df['label'] = new_df['label'] 
    nw_df.to_csv('dataset.csv', sep=',', encoding='latin-1') 

if __name__ == '__main__': 



start_time = time.clock() 
try: 
    main() 

except BrokenPipeError: 
    print("broken pipe....") 
    pass 

print (time.clock() - start_time, "seconds") 

錯誤回溯

Process Process-3: 
Traceback (most recent call last): 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\connection.py", line 312, in _recv_bytes 
    nread, err = ov.GetOverlappedResult(True) 
BrokenPipeError: [WinError 109] The pipe has been ended 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap 
    self.run() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "H:\Projects\newoproject\src\main.py", line 33, in Pool 
    results = pool.map(self.feature_extraction, url) 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 260, in map 
    return self._map_async(func, iterable, mapstar, chunksize).get() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 608, in get 
    raise self._value 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 119, in worker 
    result = (True, func(*args, **kwds)) 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\pool.py", line 44, in mapstar 
    return list(map(*args)) 
    File "H:\Projects\newoproject\src\main.py", line 26, in feature_extraction 
    self.lst.append(features.get_features()) 
    File "<string>", line 2, in append 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\managers.py", line 717, in _callmethod 
    kind, result = conn.recv() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv 
    buf = self._recv_bytes() 
    File "F:\Continuum\Anaconda3\lib\multiprocessing\connection.py", line 321, in _recv_bytes 
    raise EOFError 
EOFError 
+0

我修剪了您帖子中不可接受的乞討材料; [請閱讀此](http://meta.stackoverflow.com/q/326569/472495) - 所有這一切,謝謝! – halfer

+1

非常感謝@halfer –

回答

0

我的回答是遲到,不直接處理問題發佈;但希望能夠爲遇到類似錯誤的其他人提供線索。

錯誤,我遇到: BrokenPipeError WinError 109管道已結束& WinError 232管道被在Windows 7,當關閉

與Python 36觀察: (1)相同的異步函數多次提交,每次使用不同的多處理數據存儲實例,在我的情況下是一個隊列(multiprocessing.manager()。隊列()) 和 (2)對隊列的引用被保存在短命包絡函數中的局部變量。

儘管與成功生成並執行的異步函數共享的隊列中存在項目,並且仍然處於活動狀態(在執行異常時將put()& get()),但錯誤仍在發生。

當同一個async_func被第二次調用第二個隊列實例時,該錯誤始終發生。立即在函數的apply_async()之後,第一次提供給async_func的第一個隊列的連接將被破壞。

當對隊列的引用保存爲非重疊(如隊列列表)&封裝函數中的更長壽命變量(如返回到調用堆棧中較高的函數的變量)時,問題得到解決。