2017-07-25 36 views
0

這段代碼解析三元組(不是那麼重要),並且應該一次返回一個停車位。問題在於調用observer.next()會中斷循環,因爲它只運行一次。 console.log也不會被調用。我可能錯過了一些東西,但有可能observer.next()打破循環?有沒有錯誤,或者它是一個功能,或者我只是明顯錯誤?Observable.next()休息循環

return Observable.create(observer => { 
    this.fetch.get(datasetUrl).then(response => { 
    // Get all subjects that are parkings 
    const parkingTriples = [], 
     parkings = [], 
     totalspacesParking = [], 
     labels = []; 
    for (let index = 0; index < response.triples.length; index++) { 
     if (response.triples[index].object === 'http://vocab.datex.org/terms#UrbanParkingSite') { 
     parkingTriples.push(response.triples[index]); 
     } 
     if (response.triples[index].predicate === 'http://vocab.datex.org/terms#parkingNumberOfSpaces') { 
     totalspacesParking.push(response.triples[index]); 
     } 
     if (response.triples[index].predicate === 'http://www.w3.org/2000/01/rdf-schema#label') { 
     labels.push(response.triples[index]); 
     } 
    } 
    const _parkings = []; 
    for (let index = 0; index < parkingTriples.length; index++) { 
     const totalspacesresult = find(totalspacesParking, (o) => { 
     return o.subject === parkingTriples[index].subject 
     }); 
     const totalspaces = parseInt(n3.Util.getLiteralValue(totalspacesresult.object), 10); 
     const labelresult = find(labels, (o) => { 
     return o.subject === parkingTriples[index].subject 
     }); 
     const rdfslabel = n3.Util.getLiteralValue(labelresult.object); 
     const id = rdfslabel.replace(' ', '-').toLowerCase(); 
     observer.next(new Parking(rdfslabel, parkingTriples[index].subject, id, totalspaces, datasetUrl)); 
     console.log(observer); 
    } 

    }) 
}) 
+0

你可以簡化你的問題/提供一個測試用例嗎?通常'observer.next()'不應該破壞你的代碼 –

+0

@MarkvanStraten for(let index = 0; index

+0

'observer'的簽名是什麼?當使用一個普通的'Rx.Subject'時,你的簡化測試用例將起作用 –

回答

1

鑑於您的更新代碼,我會建議調試它。該錯誤很可能不在Rx位中。我已經重構你的代碼的Rx邏輯和響應解析之間的分裂:

function getParkings(datasetUrl){ 
    return Rx.Observable.defer(() => this.fetch.get(datasetUrl)) 
    .mergeMap(response => parseParkingResponse(response)); 
} 

function parseParkingResponse(response) { 
    const parkingTriples = [], 
    parkings = [], 
    totalspacesParking = [], 
    labels = []; 
    for (let index = 0; index < response.triples.length; index++) { 
    if (response.triples[index].object === 'http://vocab.datex.org/terms#UrbanParkingSite') { 
     parkingTriples.push(response.triples[index]); 
    } 
    if (response.triples[index].predicate === 'http://vocab.datex.org/terms#parkingNumberOfSpaces') { 
     totalspacesParking.push(response.triples[index]); 
    } 
    if (response.triples[index].predicate === 'http://www.w3.org/2000/01/rdf-schema#label') { 
     labels.push(response.triples[index]); 
    } 
    } 

    const _parkings = []; 
    for (let index = 0; index < parkingTriples.length; index++) { 
    const totalspacesresult = find(totalspacesParking, (o) => { 
     return o.subject === parkingTriples[index].subject 
    }); 
    const totalspaces = parseInt(n3.Util.getLiteralValue(totalspacesresult.object), 10); 
    const labelresult = find(labels, (o) => { 
     return o.subject === parkingTriples[index].subject 
    }); 
    const rdfslabel = n3.Util.getLiteralValue(labelresult.object); 
    const id = rdfslabel.replace(' ', '-').toLowerCase(); 
    _parkings.push(new Parking(rdfslabel, parkingTriples[index].subject, id, totalspaces, datasetUrl)); 
    } 

    return _parkings; 
} 

這將使它更容易調試是怎麼回事。

+0

這段代碼有效!但它正在流式傳輸陣列。我想一個接一個地串流每個停車場,並且在循環本身中調用oberserver.next(新的Parking()) –

+0

,除非您正在與Parking項目執行異步操作,'parseParkingResponse'將會同步,因此不會有任何流不同 –

1

一般而言,Subject不會破壞你的代碼。鑑於以下幾點:

const observer = new Rx.Subject(); 
 
const parkingTriples = new Array(10); 
 

 
observer.subscribe(val => console.log('value emitted: ' + val)); 
 

 
for (let index = 0; index < parkingTriples.length; index++) { 
 
    observer.next(index); 
 
    console.log('I still get hit through'); 
 
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

這將發出每次I still get hit through。最有可能的observer未在您的代碼中定義,因此調用observer.next()休息以及從未看到以下console.log