0

我想下面的對象追加到每一個對象在流流的NodeJS改造elasticsearch散裝

{"index":{"_index":"tvseries","_type":"internindex"}} 

我流看起來像這樣

[ 
    {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"}, 
    {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"}, 
    {"showname":"The X Files","episode":"01","content":"What?","season":"1"} 
] 

我流應該是什麼樣子!

> -> POST http://localhost:9200/_bulk {"index":{"_index":"tvseries","_type":"internindex"}} 
> {"showname":"The X Files","episode":"04","content":"Before 
> what?","season":"1"} 
> {"index":{"_index":"tvseries","_type":"internindex"}} 
> {"showname":"The X 
> Files","episode":"04","content":"Great.","season":"1"} 
> {"index":{"_index":"tvseries","_type":"internindex"}} 
> {"showname":"The X 
> Files","episode":"01","content":"What?","season":"1"} 

我怎麼能在我現有的代碼庫以下

var stream = new ElasticsearchWritableStream(client, { 
    highWaterMark: 256, 
    flushTimeout: 500 
}); 

pg.connect(connectionString,function(err, client, done) { 
    if(err) throw err; 
    var query = new QueryStream('SELECT * FROM srt limit 2') 
    var streams = client.query(query) 

    //release the client when the stream is finished 
    streams.on('end', done) 
    streams.pipe(JSONStream.stringify()).pipe(stream) 
}) 

故宮包目前我使用

對於批量插入elasticsearch實現這一使用jsonstream!

elasticsearch-writable-stream

用於獲取來自Postgres的數據轉換成流!

pg-query-stream

缺少的部分是轉換的Postgres流成彈性可寫流! 任何建議,指針,如何實現這個建議!

回答

0

所以基本上,沒有太多代碼更改的唯一可行的選擇是將批量插入到postgres自身而不是node.js對象的彈性搜索中所需的構建格式!

"SELECT 'tvseries' as index,'internindex' as type, json_build_object('showname', showname, 'epsiode', ep,'content',content,'season',season) AS body" 
+" FROM srt where shownameid=4"