2016-04-22 18 views
3

我想創建一個生產者多個消費者模型與Java8中的流。我正在閱讀和處理來自數據庫資源的數據,並且我想以流式處理方式處理它們(無法將整個資源讀入內存)。如何互連非並行流與並行流(一個生產者多個消費者)

源的讀取必須是單線程的(遊標不是線程安全的)並且讀取速度快,而每個數據塊的處理可以並行運行。

我還沒有發現如何連接(互連)並行流與並行流處理。有沒有辦法用Java8流API做到這一點?代碼

例子:

這個迭代器必須在單個線程中運行,因爲遊標不是線程安全的。

class SimpleIterator<Data> implements Iterator<Data>{ 

    private volatile Cursor cursor; 

    public SimpleIterator(Cursor cursor){ 
     this.cursor = cursor; 
    } 

    @Override 
    public boolean hasNext() { 
     return cursor.hasNext(); 
    }  

    @Override 
    public Data next() { 
    return cursor.next(); 

    } 
} 

//創建非流相同常

SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor); 
Iterable<Data> iterable =() -> iterator; 
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false 

每個數據//處理數據應並行

resultStream.parallel().forEach(data->processData(data)); 
public processData(Data data){ 
//heavy operation 
} 

運行,但如果我打電話的forEach之前設置流作爲並行比整個流是並行的,而且迭代器在多線程中調用。 是否有任何方式如何在Java8中互連這兩個流,或者我必須創建一些隊列,以提供單線程生產者流到並行流的數據。

+2

我相信你在這裏的假設是Iterator將被多線程並行調用。這不完全正確。它會被多個線程調用,是的 - 但不是並行的。你的代碼做的是創建一個包裝你的迭代器的未知大小的IteratorSpliterator。 Spliterator和迭代器都不需要是線程安全的(fork/join會處理這個問題)。像這樣想:是一個ArrayList線程安全的?不。你可以對它進行並行流處理嗎?是的你可以。 –

+0

你說得對,我的假設和你描述的一模一樣。謝謝你澄清這一點。我仍然認爲如果多個線程調用我的迭代器,即使它們不是並行運行,也可能存在一些線程問題。例如,如果遊標不會被設置爲volatile,則其他線程可以看到某個緩存對象不是原始緩存對象(例如)。 – HPCS

+2

然後確保您的光標安全地發佈。 IteratorSpliterator將以增長的批次(每個1024的倍數,例如:3072,7168,11264,...)在Iterator上運行,並將Iterator中的元素複製到具有當前批處理長度的數組中。從該陣列中創建一個ArraySpliterator(可以進一步拆分)。當一個批次被複制時,您只會看到Iterator中的線程切換。 –

回答

0

我正在處理一個問題,我需要在兩個流上進行完全外連接。問題似乎相似。我所做的是插入兩個阻塞隊列來緩衝我的輸入。我認爲你可以用一個阻塞隊列做類似的事情,將一個流分成多個流而不需要並行化源流。

我提出的解決方案可以在下面找到。我還沒有測試過我的解決方案加入兩個流,所以我不確定這是否奏效。 AbstractSpliterator類具有trySplit的實現;關於trySplit的評論是信息性的。該類的最後一個方法從分割器實現中構造一個可並行化的流。

import java.util.Spliterators; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.function.Consumer; 
import java.util.stream.Stream; 

public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> { 
    final T EOS = null; // Just a stub -- can't put a null in BlockingQueue 

    private final BlockingQueue<T> queue; 
    private final Thread thread; 

    // An implementation of Runnable that fills a queue from a stream 
    private class Filler implements Runnable { 
     private final Stream<T> stream; 
     private final BlockingQueue<T> queue; 

     private Filler(Stream<T> stream, BlockingQueue<T> queue) { 
      this.stream = stream; 
      this.queue = queue; 
     } 

     @Override 
     public void run() { 
      stream.forEach(x -> { 
       try { 
        // Blocks if the queue is full 
        queue.put(x); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      }); 
      // Stream is drained put end of stream marker. 
      try { 
       queue.put(EOS); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) { 
     super(estSize, characteristics); 
     queue = new ArrayBlockingQueue<T>(1024); 
     // Fill the queue from a separate thread (may want to externalize this). 
     thread = new Thread(new Filler(srcStream, queue)); 
     thread.start(); 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super T> action) { 
     try { 
      T value = queue.take(); // waits (blocks) for entries in queue 

      // If end of stream marker is found, return false signifying 
      // that the stream is finished. 
      if (value == EOS) { 
       return false; 
      } 
      // Accept the next value. 
      action.accept(value); 
     } catch (InterruptedException e) { 
      return false; 
     } 
     return true; 
    } 

    public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) { 
     Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream); 
     return StreamSupport.stream(spliterator, true); 
    } 
} 
相關問題