8

我正在尋找替換一個自制的日誌處理庫,看起來非常接近ReactiveStreams與io.projectreactor。我們的目標是減少我們維護的代碼,並利用社區添加的任何新功能(關注操作員融合)。反應流 - 批處理超時

作爲開始,我需要使用stdio並將多行日誌條目合併到流水線​​下方的文本blob中。該用例在Filebeat文檔的multiline log entries一章中詳細解釋(除了我們希望它在進程中)。

到目前爲止我的代碼是:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); 
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); })); 
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner()); 
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper()); 
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length())) 
      .subscribe();   

這需要關心的多線檢測到新的日誌標題時合併,但在現有的庫,我們還提供超時後的累計行沖洗(即如果在5秒內沒有收到文本,請刷新記錄)。

在Reactor中建模的正確方法是什麼?我是否需要編寫自己的運營商,還是可以自定義任何現有的?

爲了在Project Reactor或RxJava中實現此用例,相關示例和文檔的任何指針都將非常感謝。

+1

你見過'buffer(long timepan,TimeUnit unit)'operator(rxjava)嗎? – zella

+0

緩衝區看起來非常接近,但沒有一個重載符合我所需要的 - 我需要「bufferClosingSelector」和「時間跨度」關閉策略的組合 - 以先發生者爲準。 – ddimitrov

回答

3

這取決於你如何識別每個緩衝區的起始和結束,所以下面RxJava 2代碼旨在作爲一個提示有關使用的主要來源的值,以打開和關閉該緩衝器的柵極:

TestScheduler scheduler = new TestScheduler(); 
PublishProcessor<String> pp = PublishProcessor.create(); 

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
     o.buffer(o.filter(v -> v.contains("Start")), 
       v -> Flowable.merge(o.filter(w -> w.contains("End")), 
            Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f) 
.subscribe(System.out::println); 

pp.onNext("Start"); 
pp.onNext("A"); 
pp.onNext("B"); 
pp.onNext("End"); 

pp.onNext("Start"); 
pp.onNext("C"); 

scheduler.advanceTimeBy(5, TimeUnit.MINUTES); 

pp.onNext("Start"); 
pp.onNext("D"); 
pp.onNext("End"); 
pp.onComplete(); 

打印:

[Start, A, B, End] 
[Start, C] 
[Start, D, End] 

它的工作原理是通過publish共享源代碼,它允許重複使用來自上游的相同值,而無需同時運行多個源副本。開口由檢測線上的「開始」字符串控制。關閉由「結束」字符串的檢測或寬限期後的定時器觸發來控制。

編輯:

如果「開始」,也爲下一批的指標,可以取代「結束」和「開始」檢查並改變緩衝區的內容,因爲這將包括否則在以前的緩衝區中新的頭部:

pp.publish(f) 
.doOnNext(v -> { 
    int s = v.size(); 
    if (s > 1 && v.get(s - 1).contains("Start")) { 
     v.remove(s - 1); 
    } 
}) 
.subscribe(System.out::println); 
+0

如果沒有END,但是當我們看到下一個START或者tumeout到期時緩衝區是關閉的呢?我開始懷疑自己的溝通 - 在問題中有什麼不明確的地方? – ddimitrov

1

buffer運營商似乎最適合我和最簡單的解決方案。

它具有基於規模和時間的策略。 您有日誌,所以我認爲,您可以將行數解釋爲緩衝區大小。

這裏示例 - 如何發射由4秒或5秒的時間跨度分組的項目:

Observable<String> lineReader = Observable.<String>create(subscriber -> { 
     try { 
      BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 
      for (String line = br.readLine(); line != null; line = br.readLine()) { 
       subscriber.onNext(line); 
      } 
     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    }).subscribeOn(Schedulers.newThread()); 

    lineReader 
     .buffer(5, TimeUnit.SECONDS,4) 
     .filter(lines -> !lines.isEmpty()) 
     .subscribe(System.out::println); 
+0

我需要按照日誌標題分組。即如果我記錄了一條2行消息,然後是1行消息,然後是堆棧跟蹤,則另一條1行消息,然後沒有任何消息。我希望能夠立即得到3條消息,直到堆棧跟蹤,以及在收到後第4條消息。 – ddimitrov