0
我想下面的對象追加到每一個對象在流流的NodeJS改造elasticsearch散裝
{"index":{"_index":"tvseries","_type":"internindex"}}
我流看起來像這樣
[
{"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"01","content":"What?","season":"1"}
]
我流應該是什麼樣子!
> -> POST http://localhost:9200/_bulk {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X Files","episode":"04","content":"Before
> what?","season":"1"}
> {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X
> Files","episode":"04","content":"Great.","season":"1"}
> {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X
> Files","episode":"01","content":"What?","season":"1"}
我怎麼能在我現有的代碼庫以下
var stream = new ElasticsearchWritableStream(client, {
highWaterMark: 256,
flushTimeout: 500
});
pg.connect(connectionString,function(err, client, done) {
if(err) throw err;
var query = new QueryStream('SELECT * FROM srt limit 2')
var streams = client.query(query)
//release the client when the stream is finished
streams.on('end', done)
streams.pipe(JSONStream.stringify()).pipe(stream)
})
故宮包目前我使用
對於批量插入elasticsearch實現這一使用jsonstream!
用於獲取來自Postgres的數據轉換成流!
缺少的部分是轉換的Postgres流成彈性可寫流! 任何建議,指針,如何實現這個建議!