2015-08-19 75 views
0

我從node.js中獲得了巨大的價值,並熱愛流處理模型。我主要將它用於數據豐富和ETL類作業的流處理。node.js中的依賴管理與highland.js

富集,我可能有這樣的記錄...

{ "ip":"123.45.789.01", "productId": 12345 } 

我想或許是增加產品細節的描述和數據

{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 } 

的數據來充實這個價格都來自不同的流。解決高地上這種依賴問題的最佳方法是什麼?

H = require('highland') 

descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]']) 
    .flatMap(JSON.parse) 

priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]']) 
    .flatMap(JSON.parse) 

# the file is a 10G file with a json record on each line 
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'})) 
    .splitBy("\n") 
    .take(100000) # just take 100k for testing 
    .filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines 
    .doto((v)-> 
    # here i want to add the decription from the descriptionStream 
    # and i want to add the price from the price stream. 
    # in order to do that, i need to make the execution of this 
    # stream dependent on the completion of the first two and 
    # availability of that data. this is easy with declarative 
    # programming but less intuitive with functional programming 
) 
    .toArray((results)-> 
    # dump my results here 
) 

有什麼想法?

回答

0

如果您使用的是highland.js,您可以使用.map並提供一個函數來修改每個項目。

例如

var stream = _([{ "ip":"123.45.789.01", "productId": 12345 }]).map(function (x) { 
    x.productName = 'Coca-Cola 12 Pack' 
    return x; 
}); 
+0

重讀我的問題後,我意識到它是模糊的,需要用代碼示例重寫。對那個尤里抱歉。 –

0

這是對此的刺傷。這是正確的方法嗎?

H = require('highland') 

# these values would come from some api/file 
descriptionStream = H([{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]) 
    .reduce({}, (memo,v)-> 
    memo[v.productId] = v; 
    return memo 
) 

# these values would come from some api/file 
priceStream = H([{"productId":1,"price":4.00},{"productId":2,"price":1.25}]) 
    .reduce({}, (memo,v)-> 
    memo[v.productId] = v; 
    return memo 
) 

H([descriptionStream, priceStream]) 
    .series() 
    .toArray((dependencies)-> 
    [descriptionIndex, priceIndex] = dependencies 

    # these values would come from an api/file 
    H([{productId:1},{productId:2}]) 
     .doto((v)-> v.description = descriptionIndex[v.productId].description) 
     .doto((v)-> v.price = priceIndex[v.productId].price) 
     .each((v)-> 
     console.log(JSON.stringify(v)) 
    ) 
) 

這給了我正確的結果,但不知道是否這是做流依賴的優雅方式。我還假設,如果您不止一次需要價格或描述流,那麼您可以將它們分叉。

{"productId":1,"description":"Coca-Cola 12Pk","price":4} 
{"productId":2,"description":"Coca-Cola 20oz Bottle","price":1.25}