2014-12-03 46 views
2

我處理後面的圖形數據元素的潛在的無限流:Java 8流 - 增量收集/部分縮減/間歇映射/ ......這甚至稱爲什麼?

E1 <start mark> E2 foo E3 bah ... En-1 bar En <end mark>

即,<字符串>秒的流,其必須在緩衝器中積累之前,我可以將它們映射對象模型。

目標:聚集一個Stream<String>成收集關於一個無限流的Stream<ObjectDefinedByStrings>而不開銷。

在英文中,代碼應該是這樣的:「一旦你看到一個開始標記,開始緩衝,直到你看到一個結束標記,然後準備好返回舊的緩衝區,並準備一個新的緩衝區。緩衝。」

我目前的執行情況有以下形式:

Data<String>.stream() 
      .map(functionReturningAnOptionalPresentOnlyIfObjectIsComplete) 
      .filter(Optional::isPresent) 

我有幾個問題:

  1. 這是什麼操作正確叫什麼? (也就是說我可以在Google上找到更多的例子嗎?我找到的關於.map()的每次討論都談到了1:1映射,每次討論.reduce)都會討論n:1的減少。 .collect()的每次討論都談到了作爲終端操作的累積...)

  2. 這在許多不同的方面看起來很糟糕。有沒有更好的方式來實現這一點? (候選人形式爲.collectUntilConditionThenApplyFinisher(Collector,Condition,Finisher) ...?)

謝謝!

+1

這似乎是一個很糟糕的主意。地圖操作應該是無副作用的。你幾乎可以肯定應該做的就是調用'Stream.iterator()',並通過移動迭代器直到你碰到''這樣的「舊派」方式。 – 2014-12-03 23:38:47

+0

或多或少,流並不打算以這種方式使用。迭代器是更合理的方法。 – 2014-12-03 23:44:36

+0

* facepalm * 這是完全正確的。 #編碼 – limbo 2014-12-04 03:22:58

回答

2

爲了避免你的kludge,你可以在映射之前進行過濾。

Data<String>.stream() 
    .filter(text -> canBeConvertedToObject(text)) 
    .map(text -> convertToObject(text)) 

這對無限流很好地工作,只構造需要構造的對象。它還避免了創建不必要的可選對象的開銷。

1

不幸的是,在Java 8 Stream API中沒有部分簡化操作。但是,這種操作在我的StreamEx庫中實現,該庫增強了標準的Java 8流。所以,你的任務就可以解決這樣的:

Stream<ObjectDefinedByStrings> result = 
    StreamEx.of(strings) 
      .groupRuns((a, b) -> !b.contains("<start mark>")) 
      .map(stringList -> constructObjectDefinedByStrings()); 

strings是普通的Java-8流或其他來源的類似陣列,CollectionSpliterator等正常工作與無限的或平行流。 groupRuns方法採用BiPredicate,將其應用於兩個相鄰的流元素,並在必須對這些元素進行分組時返回true。這裏我們說元素應該分組,除非第二個元素包含"<start mark>"(這是新元素的開始)。之後,您將獲得List<String>元素的流。

如果收集到中間名單不適合您,您可以使用collapse(BiPredicate, Collector)方法並指定自定義收集器執行部分縮減。例如,您可能希望所有的字符串連接在一起:

Stream<ObjectDefinedByStrings> result = 
    StreamEx.of(strings) 
      .collapse((a, b) -> !b.contains("<start mark>"), Collectors.joining()) 
      .map(joinedString -> constructObjectDefinedByStrings()); 
0

我提出這部分減少2個用例:

1.解析SQL和PL/SQL(甲骨文程序)語句

SQL語句的標準分隔符是分號(;)。它將正常的SQL語句彼此分開。但是,如果你有PL/SQL語句,那麼分號將語句內部的運算符彼此分開,而不僅僅是語句。

一個解析的腳本文件同時包含普通的SQL和PL/SQL語句的方法是先用分號,如果特定語句與特定的關鍵字(DECLAREBEGIN等)開始加入與未來這種說法分裂他們,然後語句遵循PL/SQL語法規則。

順便說一句,這不能通過使用StreamEx部分減少操作來完成,因爲它們只測試兩個相鄰元素。由於您需要了解從最初的PL/SQL關鍵字元素開始的前一個流元素,以確定是否將當前元素包含在部分縮減或部分縮減中。在這種情況下,可變部分縮小可用於收集器保存已收集元素的信息,某些Predicate只測試收集器本身(如果部分縮小應完成)或測試收集器和當前流元素。理論上講,我們講的是使用流管道思想來實現LR(0)或LR(1)解析器(請參閱https://en.wikipedia.org/wiki/LR_parser)。 LR解析器可以用來解析大多數編程語言的語法。

解析器是一個有限自動機的堆棧。在LR(0)自動機的情況下,其轉換僅取決於堆棧。在LR(1)自動機的情況下,它取決於來自流的棧和下一個元素(理論上可以有LR(2),LR(3)等自動機窺視2,3等下一個元素來確定轉換,但在實踐中,所有的編程語言都是語法LR(1)語言)。

要實現解析器,應該包含一個包含有限自動機堆棧和謂詞測試的Collector,以確定是否達到了此自動機的最終狀態(因此我們可以停止還原)。在LR(0)的情況下,它應該是Predicate測試Collector本身。在LR(1)的情況下,它應該是BiPredicate測試Collector和流中的下一個元素(因爲轉換取決於堆棧和下一個符號)。

所以實現LR(0)分析器,我們將需要像以下的(T是流元素類型,A是累加器保持兩個有限自動機堆棧和結果,R是每個語法分析器工作形成輸出流的結果):

<R,A> Stream<R> Stream<T>.parse(
    Collector<T,A,R> automataCollector, 
    Predicate<A> isFinalState) 

(我刪除了複雜像? super T而不是T緊湊性 - 結果API應該包含這些)

要實現LR(1)語法分析器,我們需要類似以下內容:

<R,A> Stream<R> Stream<T>.parse(
    BiPredicate<A, T> isFinalState 
    Collector<T,A,R> automataCollector) 

注意:在這種情況下BiPredicate應它將通過蓄電池消耗之前測試元件。記住LR(1)解析器正在窺視下一個元素以確定轉換。因此,如果空累加器拒絕接受下一個元素(BiPredicate返回true,表示部分減少已結束,在由Supplier創建的空累加器和下一個流元素上),則可能有異常。

2.根據流元件類型條件配料

當我們執行我們要相鄰數據修改(DML)語句合併成一個單一批次SQL statemens(見JDBC API),以提高整體性能。但我們不想批量查詢。所以我們需要條件批處理(而不是像Java 8 Stream with batch processing那樣的無條件批處理)。

對於這種特定情況StreamEx可以使用部分縮減操作,因爲如果BiPredicate測試的兩個相鄰元素都是DML語句,則它們應該包含在批處理中。所以我們不需要知道以前批量採集的歷史。

但是,我們可以增加任務的複雜性,並說批次應該受到大小的限制。假設一批中不超過100個DML語句。在這種情況下,我們不能忽略先前的批次收集歷史並使用BiPredicate來確定批次收集是應該繼續還是停止不足。

雖然我們可以在StreamEx部分減少之後添加flatMap以將長批次拆分成部分。但是這會延遲特定的100個元素的批處理執行,直到所有的DML語句都被收集到無限批量中。毋庸置疑,這與管道意識形態相反:我們希望最大限度地減少緩衝,以最大限度地提高輸入和輸出之間的速度。此外,如果DML語句的列表非常長而無任何查詢(例如,由於數據庫導出而導致的數百萬個INSERT),則無限批量收集可能導致OutOfMemoryError不可容忍。

因此,在這種複雜的有條件的批量收集的情況下,我們也需要像在前面的用例中描述的LR(0)解析器那樣強大的功能。