2017-08-30 69 views
0

我有以下情況處理在弗林克CEP多個圖案並聯

enter image description here

有其發送流卡夫卡正在被由CEP引擎接收其中警告時產生的特定條件2的虛擬機對個人Stream滿意。

目前,CEP是檢查兩個流上相同條件下(當心髒率> 65和呼吸率> 68)患者和如下圖所示

// detecting pattern 
     Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start") 
       .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() { 
        @Override 
        public boolean filter(joinEvent joinEvent) throws Exception { 
         return joinEvent.getHeartRate() > 65 ; 
        } 
       }) 
       .subtype(joinEvent.class) 
       .where(new FilterFunction<joinEvent>() { 
        @Override 
        public boolean filter(joinEvent joinEvent) throws Exception { 
         return joinEvent.getRespirationRate() > 68; 
        } 
       }).within(Time.milliseconds(100)); 

但我想用提高並行報警Streams的不同條件。例如,我想報警如果

For patient A : if heart rate > 65 and Respiration Rate > 68 
For patient B : if heart rate > 75 and Respiration Rate > 78 

我該如何做到這一點?是否需要在同一環境中創建多個流環境或多個模式?

+0

嘿,我想知道你是否找到了你的問題的解決方案? –

+0

是的,不同的病人寫了不同的主題,flink有許多並行工作的工作人員,每個人都在聽一個話題並執行cep –

+0

感謝您的回覆,我認爲不同的病人寫了同一個源/ DataStream,並且您想要應用不同的根據不同事件/患者TT的CEP模式 –

回答

1

根據您的要求,您可以創建2種不同的圖案,以便在需要時清晰分離。

如果你想用相同的模式執行此操作,那麼它也是可能的。要做到這一點,閱讀一個卡夫卡源所有的卡夫卡主題:

FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
     Arrays.asList("topic1", "topic2"), 
     new StringSerializerToEvent(), 
     props); 

在這裏,我假設從兩個主題的活動的結構都是一樣的,你有患者姓名以及部分事件被傳送。

一旦你做到了,它變得容易,因爲你只需要創建一個模式與「或」,類似如下:

Pattern.<JoinEvent>begin("first") 
     .where(new SimpleCondition<JoinEvent>() { 

      @Override 
      public boolean filter(JoinEvent event) throws Exception { 
      return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68; 
      } 
     }) 
     .or(new SimpleCondition<JoinEvent>() { 

      @Override 
      public boolean filter(JoinEvent event) throws Exception { 
      return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78; 
      } 
     }); 

這將產生一個匹配,只要你的條件相匹配。雖然,我不確定在你的例子中「.within(Time.milliseconds(100))」是什麼。