2015-08-24 24 views
8

我剛剛在NoFlo.js的啓發下學習了highland.js。我希望能夠遞歸地操作流。在這個人爲的例子中,我將提供一個乘以2的數字,我們過濾結果< = 512。一旦數字相乘,它就會反饋到系統中。我有代碼的工作,但如果我拿出管道中的多託功能它不處理任何數字。我懷疑我不正確地將數據發送回returnPipe。有沒有更好的方法將數據傳回系統?我錯過了什麼?highlandjs中的循環數據流

### 
    input>--m--->multiplyBy2>---+ 
      |     | 
      |     | 
      +---<returnPipe<----+ 
### 

H = require('highland') 

input = H([1]) 
returnPipe = H.pipeline(
    H.doto((v)->console.log(v)) 
) 
H.merge([input,returnPipe]) 
.map((v)-> return v * 2) 
.filter((v)-> return v <= 512) 
.pipe(returnPipe) 
+0

你怎麼知道它沒有處理任何數字? – RadleyMith

+0

只要doto呼叫在那裏就行。我只想管數據,而不必在其中放入諸如doto之類的不必要的功能。 –

回答

5

從文檔:doto旋轉一個流,而re-emitting the source stream。這意味着就流水線而言,還有一個函數仍然通過它傳遞流。如果將doto取出,則原始流不會在下一次迭代中通過返回流返回。

如果你要使用管道,你必須傳遞一個方法來獲取流併發射流。例如,你可以更換doto方法與呼叫像H.map((v)=>{console.log(v); return v;})H.pipeline並自該方法消耗流併發出流,它會繼續當數據流被傳遞迴它流在.pipe(returnPipe)

編輯:要回答你的問題,當你聲明let input = H([1])你實際上是在那裏創建一個流。您可以刪除該管道returnPipe任何參考,並用下面的代碼產生相同的輸出:

let input = H([1]); 

input.map((v)=> { 
    return v * 2; 
}) 
.filter((v)=> { 
    if (v <= 512) { 
    console.log(v); 
    } 
    return v <= 512; 
}) 
.pipe(input); 
+0

這很有道理。謝謝。它只是直覺地感覺我應該能夠在沒有任何功能的情況下管理數據。有沒有像管道那樣會消耗並重新發射的東西? –

+0

請參閱我所編輯的示例。由於流的整體思想是消耗和重新發射,並且由於'H(源)'實際上創建了流對象,所以不需要創建單獨的「流水線」。我相信流水線的想法只是爲您想要流傳遞的一系列函數提供一個方便的包裝。 – jaredkwright

+0

這將永遠不會發生在我身上。非常感謝。值得+50! –

1

我的原意是寫在highland.js遞歸文件閱讀器。我posted到highland.js github問題列表和維克托Vu幫助我把這一切與一個夢幻般的文章。

H = require('highland') 
fs = require('fs') 
fsPath = require('path') 

### 
    directory >---m----------> dirFilesStream >-------------f----> out 
       |           | 
       |           | 
       +-------------< returnPipe <--------------+ 

    legend: (m)erge (f)ork 

+ directory   has the initial file 
+ dirListStream  does a directory listing 
+ out    prints out the full path of the file 
+ directoryFilter runs stat and filters on directories 
+ returnPipe  the only way i can 

### 

directory = H(['someDirectory']) 
mergePoint = H() 
dirFilesStream = mergePoint.merge().flatMap((parentPath) -> 
    H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) -> 
    fsPath.join parentPath, path 
) 
out = dirFilesStream 
# Create the return pipe without using pipe! 
returnPipe = dirFilesStream.observe().flatFilter((path) -> 
    H.wrapCallback(fs.stat)(path).map (v) -> 
    v.isDirectory() 
) 
# Connect up the merge point now that we have all of our streams. 
mergePoint.write directory 
mergePoint.write returnPipe 
mergePoint.end() 
# Release backpressure. 
out.each H.log