我想創建一個生產者多個消費者模型與Java8中的流。我正在閱讀和處理來自數據庫資源的數據,並且我想以流式處理方式處理它們(無法將整個資源讀入內存)。如何互連非並行流與並行流(一個生產者多個消費者)
源的讀取必須是單線程的(遊標不是線程安全的)並且讀取速度快,而每個數據塊的處理可以並行運行。
我還沒有發現如何連接(互連)並行流與並行流處理。有沒有辦法用Java8流API做到這一點?代碼
例子:
這個迭代器必須在單個線程中運行,因爲遊標不是線程安全的。
class SimpleIterator<Data> implements Iterator<Data>{
private volatile Cursor cursor;
public SimpleIterator(Cursor cursor){
this.cursor = cursor;
}
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Data next() {
return cursor.next();
}
}
//創建非流相同常
SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable =() -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false
每個數據//處理數據應並行
resultStream.parallel().forEach(data->processData(data));
public processData(Data data){
//heavy operation
}
運行,但如果我打電話的forEach之前設置流作爲並行比整個流是並行的,而且迭代器在多線程中調用。 是否有任何方式如何在Java8中互連這兩個流,或者我必須創建一些隊列,以提供單線程生產者流到並行流的數據。
我相信你在這裏的假設是Iterator將被多線程並行調用。這不完全正確。它會被多個線程調用,是的 - 但不是並行的。你的代碼做的是創建一個包裝你的迭代器的未知大小的IteratorSpliterator。 Spliterator和迭代器都不需要是線程安全的(fork/join會處理這個問題)。像這樣想:是一個ArrayList線程安全的?不。你可以對它進行並行流處理嗎?是的你可以。 –
你說得對,我的假設和你描述的一模一樣。謝謝你澄清這一點。我仍然認爲如果多個線程調用我的迭代器,即使它們不是並行運行,也可能存在一些線程問題。例如,如果遊標不會被設置爲volatile,則其他線程可以看到某個緩存對象不是原始緩存對象(例如)。 – HPCS
然後確保您的光標安全地發佈。 IteratorSpliterator將以增長的批次(每個1024的倍數,例如:3072,7168,11264,...)在Iterator上運行,並將Iterator中的元素複製到具有當前批處理長度的數組中。從該陣列中創建一個ArraySpliterator(可以進一步拆分)。當一個批次被複制時,您只會看到Iterator中的線程切換。 –