我從node.js中獲得了巨大的價值,並熱愛流處理模型。我主要將它用於數據豐富和ETL類作業的流處理。node.js中的依賴管理與highland.js
富集,我可能有這樣的記錄...
{ "ip":"123.45.789.01", "productId": 12345 }
我想或許是增加產品細節的描述和數據
{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }
的數據來充實這個價格都來自不同的流。解決高地上這種依賴問題的最佳方法是什麼?
H = require('highland')
descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
.flatMap(JSON.parse)
priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
.flatMap(JSON.parse)
# the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
.splitBy("\n")
.take(100000) # just take 100k for testing
.filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
.doto((v)->
# here i want to add the decription from the descriptionStream
# and i want to add the price from the price stream.
# in order to do that, i need to make the execution of this
# stream dependent on the completion of the first two and
# availability of that data. this is easy with declarative
# programming but less intuitive with functional programming
)
.toArray((results)->
# dump my results here
)
有什麼想法?
重讀我的問題後,我意識到它是模糊的,需要用代碼示例重寫。對那個尤里抱歉。 –