2015-08-18 48 views
2

我有一個MyObject流,我想批量持久化到數據庫(不是一個一個,但讓我們說一次1000)。所以我想做一個轉換,就像Java 8流批處理

Stream<MyObject> ---> Stream<List<MyObject>> 

其中每個List有一些固定大小的batchSize。有沒有辦法用標準的Java 8 Stream API來做到這一點?

+0

如果你的來源是'List'快速隨機訪問,你也可以使用[此解決方案](http://stackoverflow.com/a/30072617/4856258)來獲取流(注意它也很好地並行化)。 –

+0

謝謝Tagir。我在解析文件時得到了Stream。我不想將整個文件加載到List中。所以我的來源是流 –

回答

1

編輯:下面的原始解決方案不起作用,因爲java流不允許在同一個流上調用skip或limit一次以上。我結束了簡單的處理像

final AtomicInteger counter = new AtomicInteger(); 

    List<T> entityBatch = new ArrayList<>(); 

    entityStream.forEach(entity -> { 
     if (counter.intValue() = batchSize) { 
      processBatch(entityBatch); 

      entityBatch.clear(); 
      counter.set(0); 
     } 

     entityBatch.add(entity); 
     counter.incrementAndGet(); 
    }); 

    if (!entityBatch.isEmpty()) { 
     processBatch(entityBatch); 
    } 

原液: 它看起來像我發現做到這一點的方式:

<T> Stream<List<T>> batchStream(Stream<T> stream, int batchSize) { 
    return Stream.iterate(stream, s -> s.skip(batchSize)).map(s -> s.limit(batchSize).collect(toList())); 
} 
+1

如果這有效,那麼通過純粹的事故。它包含一個錯誤的假設,即skip和limit會保證修改流而不是返回一個新的實例,而且當它有多於batchSize的元素時,它會嘗試多次使用這個流。 – Holger