2017-08-07 65 views
0

我必須收集在3個卡夫卡採購流​​3個事件具有在給定的時間相同的correlationID,並能夠收集這些事件的全部或部分,如果他們遲到。爲什麼可以將PatternStream的相同事件發送到PatternSelectFunction和PatternTimeoutFunction?

我用在3的數據流中和CEP圖案聯合。但是我注意到與模式匹配的事件因此在select函數中收集的事件也會在超時函數中發送到超時函數

我不知道我做錯了什麼在我的例子,或者什麼,我聽不懂,但我期待的是那是正匹配的事件是不是也處於超時。

我得到的印象是不相交的時間快照存儲。

我'使用1.3.0版本弗林克。

謝謝你的幫助。

控制檯輸出,在這裏我們可以看到,3個相關的事件2被選擇和timeouted:

匹配事件:
關鍵--- 0b3c116e-0703-43cb-8b3e-54b0b5e93948
密鑰 - --f969dd4d-47ff-445℃,9182-0f95a569febb
關鍵--- 2ecbb89d-1463-4669-a657-555f73b6fb1d

超時事件:

第一次調用超時功能:
關鍵--- f969dd4d-47ff-445℃,9182-0f95a569febb
關鍵--- 0b3c116e-0703-43cb-8b3e-54b0b5e93948

第二個電話:
關鍵--- f969dd4d-47ff-445℃,9182- 0f95a569febb

11:01:44,677 INFO com.bnpp.pe.cep.Main           - Matching events: 
11:01:44,678 INFO com.bnpp.pe.cep.Main           - SctRequestProcessStep2Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false) 
11:01:44,678 INFO com.bnpp.pe.cep.Main           - SctRequestProcessStep1Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---2ecbb89d-1463-4669-a657-555f73b6fb1d, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false) 
11:01:44,678 INFO com.bnpp.pe.cep.Main           - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false) 
Right(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---2196fdb0-01e8-4cc6-af4b-04bcf9dc67a2, debtorIban=null, creditorIban=null, amount=null, communication=null), state=SUCCESS)) 
11:01:49,635 INFO com.bnpp.pe.cep.Main           - Timed out events: 
11:01:49,636 INFO com.bnpp.pe.cep.Main           - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false) 
11:01:49,636 INFO com.bnpp.pe.cep.Main           - SctRequestProcessStep2Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false) 
11:01:49,636 INFO com.bnpp.pe.cep.Main           - Timed out events: 
11:01:49,636 INFO com.bnpp.pe.cep.Main           - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false) 
Left(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---aa437bcf-ecaa-4561-9f4e-08a902f0e248, debtorIban=null, creditorIban=null, amount=null, communication=null), state=FAILED)) 
Left(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---5420eb41-2723-42ac-83fd-d203d6bf2526, debtorIban=null, creditorIban=null, amount=null, communication=null), state=FAILED)) 

我的測試代碼:

package com.bnpp.pe.cep; 

import com.bnpp.pe.event.Event; 
import com.bnpp.pe.event.SctRequestFinalEvent; 
import com.bnpp.pe.util.EventHelper; 
import lombok.extern.slf4j.Slf4j; 
import org.apache.flink.api.java.functions.KeySelector; 
import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternSelectFunction; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.PatternTimeoutFunction; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.DeserializationSchema; 

import java.io.Serializable; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

/** 
* Created by Laurent Bauchau on 2/08/2017. 
*/ 
@Slf4j 
public class Main implements Serializable { 

    public static void main(String... args) { 
     new Main(); 
    } 

    public static final String step1Topic = "sctinst-step1"; 
    public static final String step2Topic = "sctinst-step2"; 
    public static final String step3Topic = "sctinst-step3"; 

    private static final String PATTERN_NAME = "the_3_correlated_events_pattern"; 

    private final FlinkKafkaConsumer010<Event> kafkaSource1; 
    private final DeserializationSchema<Event> deserializationSchema1; 

    private final FlinkKafkaConsumer010<Event> kafkaSource2; 
    private final DeserializationSchema<Event> deserializationSchema2; 

    private final FlinkKafkaConsumer010<Event> kafkaSource3; 
    private final DeserializationSchema<Event> deserializationSchema3; 

    private Main() { 

     // Kafka init 
     Properties kafkaProperties = new Properties(); 
     kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); 
     kafkaProperties.setProperty("zookeeper.connect", "localhost:2180"); 
     kafkaProperties.setProperty("group.id", "sct-validation-cgroup1"); 

     deserializationSchema1 = new SctRequestProcessStep1EventDeserializer(); 
     kafkaSource1 = new FlinkKafkaConsumer010<>(step1Topic, deserializationSchema1, kafkaProperties); 

     deserializationSchema2 = new SctRequestProcessStep2EventDeserializer(); 
     kafkaSource2 = new FlinkKafkaConsumer010<>(step2Topic, deserializationSchema2, kafkaProperties); 

     deserializationSchema3 = new SctRequestProcessStep3EventDeserializer(); 
     kafkaSource3 = new FlinkKafkaConsumer010<>(step3Topic, deserializationSchema3, kafkaProperties); 

     try { 
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

      DataStream<Event> s1 = env.addSource(kafkaSource1); 
      DataStream<Event> s2 = env.addSource(kafkaSource2); 
      DataStream<Event> s3 = env.addSource(kafkaSource3); 

      DataStream<Event> unionStream = s1.union(s2, s3); 

      Pattern successPattern = Pattern.<Event>begin(PATTERN_NAME) 
        .times(3) 
        .within(Time.seconds(5)); 

      PatternStream<Event> matchingStream = CEP.pattern(
        unionStream.keyBy(new CIDKeySelector()), 
        successPattern); 

      matchingStream.select(new MyPatternTimeoutFunction(), new MyPatternSelectFunction()) 
        .print() 
        .setParallelism(1); 

      env.execute(); 

     } catch (Exception e) { 
      log.error(e.getMessage(), e); 
     } 
    } 

    private static class MyPatternTimeoutFunction implements PatternTimeoutFunction<Event, SctRequestFinalEvent> { 

     @Override 
     public SctRequestFinalEvent timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception { 

      List<Event> events = pattern.get(PATTERN_NAME); 
      log.info("Timed out events:"); 
      events.forEach(e -> log.info(e.toString())); 

      // Resulting event creation 
      SctRequestFinalEvent event = new SctRequestFinalEvent(); 
      EventHelper.correlate(events.get(0), event); 
      EventHelper.injectKey(event); 
      event.setState(SctRequestFinalEvent.State.FAILED); 

      return event; 
     } 
    } 

    private static class MyPatternSelectFunction 
      implements PatternSelectFunction<Event, SctRequestFinalEvent> { 

     @Override 
     public SctRequestFinalEvent select(Map<String, List<Event>> pattern) throws Exception { 

      List<Event> events = pattern.get(PATTERN_NAME); 
      log.info("Matching events:"); 
      events.forEach(e -> log.info(e.toString())); 

      // Resulting event creation 
      SctRequestFinalEvent event = new SctRequestFinalEvent(); 
      EventHelper.correlate(events.get(0), event); 
      EventHelper.injectKey(event); 
      event.setState(SctRequestFinalEvent.State.SUCCESS); 

      return event; 
     } 
    } 

    private static class CIDKeySelector implements KeySelector<Event, String> { 
     @Override 
     public String getKey(Event event) throws Exception { 
      return event.getCorrelationId(); 
     } 
    } 
} 

回答

2

讓我們來分析一下,做了你說的圖案。你逝去的模式,如:

Pattern.<Event>begin(PATTERN_NAME) 
    .times(3) 
    .within(Time.seconds(5)); 

它確實說,搜索三個5秒內發生的任何事件序列。現在flink開始搜索每個後續事件的新匹配(有正在進行的工作來引入新的MatchingBehaviours請參閱FLINK-7169)。

所以顯示簡單的例子。如果你在5秒內有一個像A B C D E的序列。的CEP庫會返回結果:

  • A B C
  • B C d
  • ÇdË

和兩個timeouted:

  • d
+0

我明白你的意思,但在KeyedStream我的模式工作,僅包含3相關事件的時間和從未更多,和我在卡夫卡只發送3相關事件上運行我的測試: 剛(A ,B,C)和(A,B,C)匹配,在這種情況下,我不明白爲什麼我收到(A,C)和(C)作爲部分匹配事件的超時? –

+0

瞭解!謝謝。 –

0

你的程序....

在你的程序通過時間選擇文本,所以你傳遞PatterStream對象BOTH Function.No需要點時間來選擇串......你不要」 t去PatternTimeOutFunction()。

看到這裏,沒有時間因素。

import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternSelectFunction; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.time.Time; 

import java.util.Map; 

public class FlinkCEP { 

    public static void main(String[] args) throws Exception { 

     // set up the execution environment 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     DataStream<String> text = env.socketTextStream("localhost", 1111) 
       .flatMap(new LineTokenizer()); 

     text.print(); 

     Pattern<String, String> pattern = 
       Pattern.<String>begin("start").where(txt -> txt.equals("a")) 
         .next("middle").where(txt -> txt.equals("b")) 
         .followedBy("end").where(txt -> txt.equals("c")).within(Time.seconds(1)); 

     PatternStream<String> patternStream = CEP.pattern(text, pattern); 

     DataStream<String> alerts = patternStream.select(new PatternSelectFunction<String, String>() { 
      @Override 
      public String select(Map<String, String> matches) throws Exception { 
       return "Found: " + 
         matches.get("start") + "->" + 
         matches.get("middle") + "->" + 
         matches.get("end"); 
      } 
     }); 

     // emit result 
     alerts.print(); 

     // execute program 
     env.execute("WordCount Example"); 
    } 
} 
+0

謝謝......但是 1)我需要PatternTimeoutFunction來捕獲失敗的處理 2)如果我不需要捕獲失敗的處理,我可以使用另一個選擇簽名(沒有PatternTimeoutFunction) 3)我不能使用static where子句像txt.equals(「a」),因爲我需要比較事件,這就是爲什麼我使用KeyedStream –

相關問題