2016-11-29 26 views
0

我有兩種數據類型。在另一個內部創建新的數據流

type1 and type2 

而且我有一個type1的數據流。

DataStream<type1> stream1 =... 

裏面stream1我想創建的type2對象,我想同時收集 - 的type1type2對象。

是否有可能有一個輸入類型和兩個輸出類型與一個數據流?或者是否可以在stream1的內部創建新的數據流(DataStream<type2> stream2)?

或者還有其他方法來收集兩種不同類型的數據從一種類型評估?

回答

1

您需要首先創建一個包裝類型,然後再拆分並選擇您的流。對於包裝,只有一個成員是非零;

class TypeWrapper { 
    // keeping this short for brevity 
    public TypeA firstType; 
    public TypeB secondType; 
} 

斯普利特和選擇:

DataStream<TypeWrapper> stream1 = ... 

DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() { 
    @Override 
    public boolean filter(TypeWrapper value) throws Exception { 
     return value.firstType != null; 
    } 
}) 
.map(new MapFunction<TypeWrapper, TypeA>() { 
    @Override 
    public TypeA map(TypeWrapper value) throws Exception { 
     return value.firstType; 
    } 
}); 

DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() { 
    @Override 
    public boolean filter(TypeWrapper value) throws Exception { 
     return value.secondType != null; 
    } 
}) 
.map(new MapFunction<TypeWrapper, TypeB>() { 
    @Override 
    public TypeB map(TypeWrapper value) throws Exception { 
     return value.secondType; 
    } 
}); 

因爲filter()map()將被鏈接到stream1兩者是相同的線程上執行,操作便宜。

1
import org.apache.flink.streaming.api.functions.source.SourceFunction 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import org.apache.flink.api.scala._ 

case class Type1(){} 
case class Type2(){} 


object MultipleOutputJob { 
    def main(args: Array[String]) { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    // Stream of Type1 
    val stream1 = env.addSource((sc: SourceFunction.SourceContext[Type1]) => { 
     while(true){ 
     Thread.sleep(1000) 
     sc.collect(Type1()) 
     } 
    }) 

    // Mapping from Type1 to Type2 
    val stream2 = stream1.map(t1 => Type2()) 

    // Collect both the original and the derived data 
    stream1.print 
    stream2.print 

    env.execute() 
    } 
} 
+0

對不起,我忘記說了,我正在尋找一個Java解決方案。 – CPA

+0

@Jamie你能幫我解決這個問題嗎 https://stackoverflow.com/questions/46282692/match-based-on-some-property-between-two-data-streams-and-collect-all-based-上-m的 – Kumar

相關問題