我正在尋找替換一個自制的日誌處理庫,看起來非常接近ReactiveStreams與io.projectreactor
。我們的目標是減少我們維護的代碼,並利用社區添加的任何新功能(關注操作員融合)。反應流 - 批處理超時
作爲開始,我需要使用stdio並將多行日誌條目合併到流水線下方的文本blob中。該用例在Filebeat文檔的multiline log entries一章中詳細解釋(除了我們希望它在進程中)。
到目前爲止我的代碼是:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
這需要關心的多線檢測到新的日誌標題時合併,但在現有的庫,我們還提供超時後的累計行沖洗(即如果在5秒內沒有收到文本,請刷新記錄)。
在Reactor中建模的正確方法是什麼?我是否需要編寫自己的運營商,還是可以自定義任何現有的?
爲了在Project Reactor或RxJava中實現此用例,相關示例和文檔的任何指針都將非常感謝。
你見過'buffer(long timepan,TimeUnit unit)'operator(rxjava)嗎? – zella
緩衝區看起來非常接近,但沒有一個重載符合我所需要的 - 我需要「bufferClosingSelector」和「時間跨度」關閉策略的組合 - 以先發生者爲準。 – ddimitrov