2017-07-10 47 views
0

我具有低於此弗林克程序:如何在Flink的兩個不同的Kafka流中應用相同的模式?

object WindowedWordCount { 
    val configFactory = ConfigFactory.load() 

    def main(args: Array[String]) = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    val kafkaStream1 = env.addSource(new FlinkKafkaConsumer010[String](topic1, new SimpleStringSchema(), props)) 
     .assignTimestampsAndWatermarks(new TimestampExtractor) 

    val kafkaStream2 = env.addSource(new FlinkKafkaConsumer010[String](topic2, new SimpleStringSchema(), props)) 
     .assignTimestampsAndWatermarks(new TimestampExtractor) 

    val partitionedStream1 = kafkaStream1.keyBy(jsonString => { 
     extractUserId(jsonString) 
    }) 

    val partitionedStream2 = kafkaStream2.keyBy(jsonString => { 
     extractUserId(jsonString) 
    }) 

    //Is there a way to match the userId from partitionedStream1 and partitionedStream2 in this same pattern? 
    val patternForMatchingUserId = Pattern.begin[String]("start") 
     .where(stream1.getUserId() == stream2.getUserId()) //I want to do something like this 

    //Is there a way to pass in partitionedStream1 and partitionedStream2 to this CEP.pattern function? 
    val patternStream = CEP.pattern(partitionedStream1, patternForMatchingUserId) 

    env.execute() 
    } 
} 

在上面的程序弗林克,我有一個名爲partitionedStream1兩個流和partitionedStream2keyedBy的用戶ID。

我想以某種方式比較patternForMatchingUserId模式中兩個數據流的數據(類似於上面顯示的)。有沒有辦法將兩個流傳遞給CEP.Pattern函數?

事情是這樣的:

val patternStream = CEP.pattern(partitionedStream1, partitionedStream2, patternForMatchingUserId)

回答

2

沒有辦法,你可以通過兩個流至CEP,但你可以通過組合流。

如果兩個流具有相同的類型/架構。你可以把它們結合起來。我相信這個解決方案符合你的情況。

partitionedStream1.union(partitionedStream2).keyBy(...) 

如果他們有不同的模式。您可以使用一些自定義邏輯將它們轉換爲一個流,例如coFlatMap

+0

所以我的分區流1是從卡夫卡進來的,每3秒鐘一次,我的分區流2每8秒從卡夫卡進來。出於某種原因,當我做'val unionedStream = partitionedStream1.union(partitionedStream2)'然後'unionedStream.print()'時,它將從partitionedStream1中打印兩次JSON數據,而不是從partitionedStream2中打印出任何內容。這是預期的行爲? – CapturedTree

+1

不,不是。你確定你已經爲每個partitionedStream使用了正確的主題名稱嗎? –

+0

這是問題所在。我發誓我檢查了,我錯過了兩次。謝謝! – CapturedTree

相關問題