2016-07-06 33 views
1

我想用FlinkCEP只在模式上做一個'懶惰'匹配。我怎樣才能做到這一點? 例如我有一個輸入流ACABCABCB,我想匹配A後面的C得到只有3場比賽,而不是6場比賽。Ho我可以和Flink CEP做一個懶惰的匹配嗎

我創建了以下示例來說明我的問題。

val env = StreamExecutionEnvironment.createLocalEnvironment(1) 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) 

case class MyEvent(id: Int, kind: String, value: String) 
case class MyAggregatedEvent(id: Int, concatenatedValue: String) 

val eventStream = env.fromElements(
    MyEvent(1, "A", "1"), MyEvent(1, "C", "1"), 
    MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"), 
    MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"), 
    MyEvent(1, "B", "3") 
) 

val pattern: Pattern[MyEvent, _] = Pattern 
    .begin[MyEvent]("pA").where(e => e.kind == "A") 
    .next("pC").where(e => e.kind == "C") 
    .within(Time.seconds(5)) 

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern) 

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect { 
    (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) => 
    val partA = pattern.get("pA").get 
    val partC = pattern.get("pC").get 

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value)) 
} 
outNextStream.print() 

env.execute("Experiment") 

這使我下面的輸出:

MyAggregatedEvent(1,1 => 1)

當我模式更改爲:

val pattern: Pattern[MyEvent, _] = Pattern 
    .begin[MyEvent]("pA").where(e => e.kind == "A") 
    .followedBy("pC").where(e => e.kind == "C") 
    .within(Time.seconds(5)) 

然後印刷如下:

MyAggregatedEvent(1,1 => 1)
MyAggregatedEvent(1,1 => 2)
MyAggregatedEvent(1,2 => 2)
MyAggregatedEvent(1,1 => 3)
MyAggregatedEvent(1,2 = > 3)
MyAggregatedEvent(1,3 => 3)

如何創建的圖案,只有每個事件一次匹配,從而使我的輸出將是:

MyAggregatedEvent(1,1 => 1)
MyAggregatedEvent(1,2 => 2)
MyAggregatedEvent(1,3 => 3)

回答

1

目前,Flink的CEP庫不支持此功能。匹配的語義還不能被控制。我認爲最好增加一個MATCH_ALL和一個匹配MATCH_FIRST模式開始。一旦它看到完全匹配的序列,MATCH_FIRST就丟棄所有的中間狀態。這應該涵蓋你的用例。

相關問題