2016-04-10 42 views
2

在我的應用程序中,我使用了幾個提供表單元素(ID,值)的Streams。元素是由下面的類定義:如何在兩個或多個Stream上執行外連接

static final class Element<T> implements Comparable<Element<T>> { 
    final long id; 
    final T value; 

    Element(int id, T value) { 
     this.id = id; 
     this.value = value; 
    } 

    @Override 
    public int compareTo(Element o) { 
     return Long.compare(id, o.id); 
    } 
} 

我的目標是通過元素的ID來連接兩個或多個流(每個流中的ID進行分類,並嚴格單調),例如:

Stream <Element> colour = Arrays.stream(new Element[]{new Element(1, "red"), new Element(2, "green"), new Element(4, "red"), new Element(6, "blue")}); 
    Stream <Element> length = Arrays.stream(new Element[]{new Element(2, 28), new Element(3, 9), new Element(4, 17), new Element(6, 11)}); 
    Stream <Element> mass = Arrays.stream(new Element[]{new Element(1, 87.9f), new Element(2, 21.0f), new Element(3, 107f)}); 

到包含形式(ID,[T1,T2,T3])的元素的單個流:通過應用這樣一些方法

Stream<Element<Object[]>> allProps = joinStreams(colour, length, mass); 

public Stream<Element<Object[]>> joinStreams(Stream<Element>... streams) { 
    return ...; 
} 

得到的流應該提供一個FULL OUTER JOIN,即對於上面的例子:

1, "red", null, 87.9 
2, "green", 28, 21.0 
3, null, 9, 107 
4, "red" 17, null 
6, "blue", 11, null 

因爲我用Java的流API的經驗是很基本的,到目前爲止我通常使用迭代器等任務。

是否有一種習慣(有效)的方式來執行這種Streams連接?有沒有可以使用的實用程序庫?

備註:該示例已簡化。應用程序從類似於面向列的數據存儲庫(沒有真正的DMBS)接收數據,這是幾千兆字節的大小,並不容易放入內存。這種連接操作也沒有內置的支持。

+0

'myElementsStream.collect(Collectors.groupingBy(e - > e.id))'? – fge

+0

我在這裏有三個流 - 你如何定義myElementsStream? – Matthias

回答

1

要構建完整的外部連接流實現,我使用了兩個阻塞隊列。一個隊列與每個流相關聯,一個Filler類(一個Runnable實現)從流中讀取數據並將其寫入隊列。當填充類用完數據時,它將一個流結束標記寫入隊列。然後我從AbstractSpliterator構造一個分割器。 tryAdvance方法實現從左邊的隊列和右邊的隊列中獲取一個值,並根據比較結果消耗或保留這些值。我使用Element類的變體。請看下面的代碼:

import java.util.ArrayList; 
import java.util.Collection; 

public final class Element<T> implements Comparable<Element<T>> { 
    final long id; 
    final Collection<T> value; 

    public Element(int id, T value) { 
     this.id = id; 
     // Order preserving 
     this.value = new ArrayList<T>(); 
     this.value.add(value); 
    } 

    Element(long id, Element<T> e1, Element<T> e2) { 
     this.id = id; 
     this.value = new ArrayList<T>(); 
     add(e1); 
     add(e2); 
    } 

    private void add(Element<T> e1) { 
     if(e1 == null) { 
      this.value.add(null);   
     } else { 
      this.value.addAll(e1.value); 
     } 
    } 

    /** 
    * Used as End-of-Stream marker 
    */ 
    Element() { 
     id = -1; 
     value = null; 
    } 

    @Override 
    public int compareTo(Element<T> o) { 
     return Long.compare(id, o.id); 
    } 
} 

連接實現

import java.util.Comparator; 
import java.util.Spliterator; 
import java.util.Spliterators; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.function.Consumer; 
import java.util.stream.Stream; 
import java.util.stream.StreamSupport; 

public class OuterJoinSpliterator<T> extends Spliterators.AbstractSpliterator<Element<T>> { 

    private final class Filler implements Runnable { 
     private final Stream<Element<T>> stream; 
     private final BlockingQueue<Element<T>> queue; 

     private Filler(Stream<Element<T>> stream, BlockingQueue<Element<T>> queue) { 
      this.stream = stream; 
      this.queue = queue; 
     } 

     @Override 
     public void run() { 
      stream.forEach(x -> { 
       try { 
        queue.put(x); 
       } catch (final InterruptedException e) { 
        e.printStackTrace(); 
       } 
      }); 
      try { 
       queue.put(EOS); 
      } catch (final InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public final Element<T> EOS = new Element<T>(); 
    private final int queueSize; 
    private final BlockingQueue<Element<T>> leftQueue; 
    private final BlockingQueue<Element<T>> rightQueue; 
    protected Element<T> leftValue; 
    protected Element<T> rightValue; 

    private OuterJoinSpliterator(long estSize, int characteristics, int queueSize, 
      Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) { 
     super(estSize, characteristics); 
     this.queueSize = queueSize; 
     leftQueue = createQueue(); 
     rightQueue = createQueue(); 
     createFillerThread(leftStream, leftQueue).start(); 
     createFillerThread(rightStream, rightQueue).start(); 
    } 

    private Element<T> acceptBoth(long id, Element<T> left, Element<T> right) { 
     return new Element<T>(id, left, right); 
    } 

    private final Element<T> acceptLeft(Element<T> left) { 
     return acceptBoth(left.id, left, null); 
    } 

    private final Element<T> acceptRight(Element<T> right) { 
     return acceptBoth(right.id, null, right); 
    } 

    private final Thread createFillerThread(Stream<Element<T>> leftStream, BlockingQueue<Element<T>> queue) { 
     return new Thread(new Filler(leftStream, queue)); 
    } 

    private final ArrayBlockingQueue<Element<T>> createQueue() { 
     return new ArrayBlockingQueue<>(queueSize); 
    } 

    @Override 
    public Comparator<? super Element<T>> getComparator() { 
     return null; 
    } 

    private final boolean isFinished() { 
     return leftValue == EOS && rightValue == EOS; 
    } 

    @Override 
    public final boolean tryAdvance(Consumer<? super Element<T>> action) { 
     try { 
      updateLeft(); 

      updateRight(); 

      if (isFinished()) { 
       return false; 
      } 

      if (leftValue == EOS) { 
       action.accept(acceptRight(rightValue)); 
       rightValue = null; 
      } else if (rightValue == EOS) { 
       action.accept(acceptLeft(leftValue)); 
       leftValue = null; 
      } else { 
       switch (leftValue.compareTo(rightValue)) { 
       case -1: 
        action.accept(acceptLeft(leftValue)); 
        leftValue = null; 
        break; 
       case 1: 
        action.accept(acceptRight(rightValue)); 
        rightValue = null; 
        break; 
       default: 
        action.accept(acceptBoth(leftValue.id, leftValue, rightValue)); 
        leftValue = null; 
        rightValue = null; 
       } 
      } 
     } catch (final InterruptedException e) { 
      return false; 
     } 
     return true; 
    } 

    private final void updateLeft() throws InterruptedException { 
     if (leftValue == null) { 
      leftValue = leftQueue.take(); 
     } 
    } 

    private final void updateRight() throws InterruptedException { 
     if (rightValue == null) { 
      rightValue = rightQueue.take(); 
     } 
    } 

    public static <T> Stream<Element<T>> join(long estSize, int characteristics, int queueSize, boolean parallel, Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) { 
     Spliterator<Element<T>> spliterator = new OuterJoinSpliterator<>(estSize, characteristics, queueSize, leftStream, rightStream); 
     return StreamSupport.stream(spliterator, parallel); 
    } 
} 

您可以使用Long.MAX_VALUE爲您估計大小。請參閱Spliterator界面以獲取各種流特性的說明。有關其他信息,請參閱AbstractSpliterator的註釋。

-1

最簡單的解決方案是編寫迭代器,然後使用StreamSupport :: stream從迭代器創建流。但是如果您要使用並行流,則可以發現性能方面的一些問題。

相關問題