您需要首先創建一個包裝類型,然後再拆分並選擇您的流。對於包裝,只有一個成員是非零;
class TypeWrapper {
// keeping this short for brevity
public TypeA firstType;
public TypeB secondType;
}
斯普利特和選擇:
DataStream<TypeWrapper> stream1 = ...
DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.firstType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeA>() {
@Override
public TypeA map(TypeWrapper value) throws Exception {
return value.firstType;
}
});
DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.secondType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeB>() {
@Override
public TypeB map(TypeWrapper value) throws Exception {
return value.secondType;
}
});
因爲filter()
和map()
將被鏈接到stream1
兩者是相同的線程上執行,操作便宜。
對不起,我忘記說了,我正在尋找一個Java解決方案。 – CPA
@Jamie你能幫我解決這個問題嗎 https://stackoverflow.com/questions/46282692/match-based-on-some-property-between-two-data-streams-and-collect-all-based-上-m的 – Kumar