2013-06-03 42 views
0

我在我的代碼中使用了parallel_pipeline函數。有時當我的條件滿足時,它會停止流水線,有時它不會。當流控制調用停止後,它不會終止,而是調用其下一部分並在控制檯上打印,然後控制檯輸出變得像它一直停留在無限循環中,什麼都不做。parallel_pipeline不終止

代碼是:

#include <iostream> 
#include <sstream> 
#include <fstream> 
#include <vector> 
#include <algorithm> 
#include <tbb/pipeline.h> 
#include <tbb/atomic.h> 
#include <tbb/concurrent_queue.h> 
#include <tbb/compat/thread> 
#include <tbb/tbbmalloc_proxy.h>  
#include <tbb/tick_count.h> 
using namespace std; 
using namespace tbb; 

#define pi 3.141593 
#define FILTER_LEN 265 

double coeffs[ FILTER_LEN ] = 
{ 
    0.0033473431384214393,0.000032074683390218124,0.0033131082058404943,0.0024777666109278788, 
    -0.0008968429179843104,-0.0031973449396977684,-0.003430943381749411,-0.0029796565504781646, 
    -0.002770673157048994,-0.0022783059845596586,-0.0008531818129514857,0.001115432556294998, 
    0.0026079871108133294,0.003012423848769931,0.002461420635709332,0.0014154004589753215, 
    0.00025190669718400967,-0.0007608257014963959,-0.0013703600874774068,-0.0014133823230551277, 
    -0.0009759556503342884,-0.00039687498737139273,-0.00007527524701314324,-0.00024181463305012626, 
    -0.0008521761947454302,-0.00162618205097997,-0.002170446498273018,-0.002129903305507943, 
    -0.001333859049002249,0.00010700092934983156,0.0018039564602637683,0.7930896349583, 
    0.0038325849735515363,0.003416201274366522,0.9,0.00017954815260431595, 
    -0.0016358832300944531,-0.0028402136847527387,-0.0031256650498727384,-0.0025374271571154713, 
    -0.001438370315670195,-0.00035115295209013755,0.0002606730012030533,0.0001969569787142967, 
    -0.00039635535951198597,-0.0010886127490608972,-0.0013530057243606405,-0.0008123200399262436, 
    0.0005730271959526784,0.0024419465938120906,0.004133717273258681,0.0049402122577746265, 
    0.0043879285604252714,0.002449549610687005,-0.00040283102645093463,-0.003337730734820209, 
    -0.0054508346511294775,-0.006093057767824609,-0.005117609782189977,-0.0029293645861970417, 
    -0.0003251033117661085,0.0018074390555649442,0.0028351284091668164,0.002623563404428517, 
    0.0015692864792199496,0.0004127664681096788,-0.00009249878881824428,0.0004690173244168184, 
    0.001964334172374759,0.0037256715492873485,0.004809640399145206,0.004395274594482053, 
    0.0021650921193604,-0.0014888595443799124,-0.005534807968511709,-0.008642334104607624, 
    -0.009668950651149259,-0.008104732391434574,-0.004299972815463919,0.0006184612821881392, 
    0.005136551428636121,0.007907786753766152,0.008241212326068366,0.00634786595941524, 
    0.003235610213062744,0.00028882736660937287,-0.001320994685952108,-0.0011237433853145615, 
    0.00044213409507615003,0.0022057106517524255,0.00277593527678719,0.0011909915058737617, 
    -0.0025807757230413447,-0.007497632882437637,-0.011739520895818884,-0.013377018279057393, 
    -0.011166543231844196,-0.005133056165990026,0.0032948631959114935,0.011673660427968408, 
    0.017376415708412904,0.018548938130314566,0.014811760899506572,0.007450782505155853, 
    -0.001019540069785369,-0.007805775815783898,-0.010898333714715424,-0.00985364043415772, 
    -0.005988406030111452,-0.001818560524968024,0.000028552677472614846,-0.0019938756495376363, 
    -0.007477684025727061,-0.013989430449615033,-0.017870518868849213,-0.015639422062597726, 
    -0.005624959109456065,0.010993528170353541,0.03001263681283932,0.04527492462846608, 
    0.050581340787164114,0.041949186532860346,0.019360612460662185,-0.012644336735920483, 
    -0.0458782599058412,-0.07073838953156347,-0.0791205623455818,-0.06709535677423759, 
    -0.03644544574795176,0.005505370370858695,0.04780486657828151,0.07898800597378192, 
    0.0904453420042807,0.07898800597378192,0.04780486657828151,0.005505370370858695, 
    -0.03644544574795176,-0.06709535677423759,-0.0791205623455818,-0.07073838953156347, 
    -0.0458782599058412,-0.012644336735920483,0.019360612460662185,0.041949186532860346, 
    0.050581340787164114,0.04527492462846608,0.03001263681283932,0.010993528170353541, 
    -0.005624959109456065,-0.015639422062597726,-0.017870518868849213,-0.013989430449615033, 
    -0.007477684025727061,-0.0019938756495376363,0.000028552677472614846,-0.001818560524968024, 
    -0.005988406030111452,-0.00985364043415772,-0.010898333714715424,-0.007805775815783898, 
    -0.001019540069785369,0.007450782505155853,0.014811760899506572,0.018548938130314566, 
    0.017376415708412904,0.011673660427968408,0.0032948631959114935,-0.005133056165990026, 
    -0.011166543231844196,-0.013377018279057393,-0.011739520895818884,-0.007497632882437637, 
    -0.0025807757230413447,0.0011909915058737617,0.00277593527678719,0.0022057106517524255, 
    0.00044213409507615003,-0.0011237433853145615,-0.001320994685952108,0.00028882736660937287, 
    0.003235610213062744,0.00634786595941524,0.008241212326068366,0.007907786753766152, 
    0.005136551428636121,0.0006184612821881392,-0.004299972815463919,-0.008104732391434574, 
    -0.009668950651149259,-0.008642334104607624,-0.005534807968511709,-0.0014888595443799124, 
    0.0021650921193604,0.004395274594482053,0.004809640399145206,0.0037256715492873485, 
    0.001964334172374759,0.0004690173244168184,-0.00009249878881824428,0.0004127664681096788, 
    0.0015692864792199496,0.002623563404428517,0.0028351284091668164,0.0018074390555649442, 
    -0.0003251033117661085,-0.0029293645861970417,-0.005117609782189977,-0.006093057767824609, 
    -0.0054508346511294775,-0.003337730734820209,-0.00040283102645093463,0.002449549610687005, 
    0.0043879285604252714,0.0049402122577746265,0.004133717273258681,0.0024419465938120906, 
    0.0005730271959526784,-0.0008123200399262436,-0.0013530057243606405,-0.0010886127490608972, 
    -0.00039635535951198597,0.0001969569787142967,0.0002606730012030533,-0.00035115295209013755, 
    -0.001438370315670195,-0.0025374271571154713,-0.0031256650498727384,-0.0028402136847527387, 
    -0.0016358832300944531,0.00017954815260431595,0.9,0.003416201274366522, 
    0.0038325849735515363,0.7930896349583,0.0018039564602637683,0.00010700092934983156, 
    -0.001333859049002249,-0.002129903305507943,-0.002170446498273018,-0.00162618205097997, 
    -0.0008521761947454302,-0.00024181463305012626,-0.00007527524701314324,-0.00039687498737139273, 
    -0.0009759556503342884,-0.0014133823230551277,-0.0013703600874774068,-0.0007608257014963959, 
    0.00025190669718400967,0.0014154004589753215,0.002461420635709332,0.003012423848769931, 
    0.0026079871108133294,0.001115432556294998,-0.0008531818129514857,-0.0022783059845596586, 
    -0.002770673157048994,-0.0029796565504781646,-0.003430943381749411,-0.0031973449396977684, 
    -0.0008968429179843104,0.0024777666109278788,0.0033131082058404943,0.000032074683390218124, 
    0.0033473431384214393 
}; 

class MyBuffer 
{ 
    public: 
    double *acc; 
    double *buffer; 
    int start,end; 
    static int j; 

    MyBuffer() 
    { 
     start=0; 
     end=0; 

     buffer=new double[150264]; 
     acc=new double[150000]; 
     fill_n(buffer,150264,0); 

    } 

    ~MyBuffer() 
    { 
     delete[] buffer; 
     delete[] acc; 
    } 

    int startnumber() 
    { 
     return start; 
    } 

    int endnumber() 
    { 
     return end; 
    } 

}; 

typedef concurrent_bounded_queue<MyBuffer> QueueMyBufferType; 
QueueMyBufferType chunk_queue; 

atomic<bool> stop_flag; 
atomic<bool> stop_filter; 

int MyBuffer::j=0; 
int queueloopcount=30; 

void input_function()        
{ 
    stop_flag = false;  

    cout<<"thread reached to call input function " <<endl; 
    ofstream o("testing sinewave.csv"); 

    int counter=0; 
    while(counter<150000)  
    { 
    // cout<<"value of counter is \t" <<counter << endl; 

     MyBuffer *b=new MyBuffer;              
     b->start=(FILTER_LEN-1+(counter)); 
     b->end=(5264+(counter)); 

    // cout<<"value of b.start is and b.end is "<<b->start<<"\t" <<b->end<<endl; 

     for(int i =b->startnumber(); i <b->endnumber(); i++) 
     { 
       b->buffer[i] = sin(700 * (2 * pi) * (i/5000.0)); 
       o<<b->buffer[i]<<endl; 
     } 
     chunk_queue.push(*b); 

     counter+=5000; 
     // cout<<"value of queueloopcount is "<< queueloopcount << endl; 
    } 

    cout<<"all data is perfectly generated" <<endl; 
} 

int main() 
{ 

    int ntokens = 8; 

    thread inputfunc(input_function); 
    tick_count t1,t2; 
    ofstream o("filter700Hz.csv"); 
    t1=tick_count::now(); 

    bool stop_pipeline = false;  
    stop_filter=false; 


    inputfunc.join(); 

    parallel_pipeline(ntokens,make_filter<void,MyBuffer*> 
    ( 
      filter::parallel,[&](flow_control& fc)->MyBuffer* 
     { 
      if(queueloopcount==0) 
      { 
       fc.stop(); 
       cout<<"pipeline stopped"<<endl; 
      } 
      else 
      { 
        MyBuffer *b=new MyBuffer; 
        chunk_queue.pop(*b); 
       { 
        cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl; 
        queueloopcount--; 
       } 
       return b; 
      } 
     } 
    )& 

    make_filter<MyBuffer*, void> 
    (
     filter::serial,[&](MyBuffer* b) 
     { 
      cout<<"value of second filter start is and end is \t "<< b->startnumber() << "\t" << b->endnumber() <<endl; 
     } 
    ) 
    ); 

     cout<<"now i am out" <<endl; 
    o.close(); 
    t2=tick_count::now(); 

    cout << "\n Time elapsed is \n\n" <<(t2-t1).seconds()<<endl;   

    return 0; 
} 

請幫忙找哪裏的代碼是錯誤的。

回答

0

這段代碼中的問題與過濾器即第一階段並行,這是造成問題的流量控制對象,它試圖彈出,它是一個阻塞調用,由於它阻止和解決方案,你應該有一個串行第一個過濾器只創建一個空的MyBuffer *並在沒有更多工作到期時停止管道。然後有一個並行的第二個過濾器,執行真正的工作,最後是一個串行(按順序)輸出階段。