2016-08-04 50 views
1

我想檢測共享相同屬性的事件。假設我有一個簡單的例子類:Apache Flink + CEP - 檢測相同的事件

case class Record(name: String, value: Int) 

假設有下面的流:

Record("A", 1) 
Record("B", 2) 
Record("A", 3) 
Record("C", 4) 

然後我想檢測雙「A」記錄。這可能嗎?我現在有這樣的:

val start: Pattern[Record, _] = myStream 
.begin("first") 
.followedBy("second").where(previous_record.name == _.name) 
+0

爲了檢測上同樣的屬性,你需要大的存儲空間,因爲屬性集合可以是無限的, 對於你需要存儲的每個屬性。如果屬性集限於您可以應用過濾。 – ravthiru

回答

1

我想你要定義什麼是事件檢測,你嘗試過這樣的:

val start: Pattern[Record, _] = myStream 
    .begin("first").where(name == "A") 
    .followedBy("second").where(name == "A") 

更新:例如:

val patternIG: Pattern[(String,String), _] = Pattern.begin[String,String)]("start").where(_.name == "Ignition").where(_.ac == "ON").next("end").where(_.name == "Door").where(_.ac == "ON") 
val patternStream: PatternStream[(String,String)] = CEP.pattern(mystream, patternIG) 
def selectFn(pattern : mutable.Map[String,(String,String)]): String = { 
val startEvent = pattern.get("start").get 
val endEvent = pattern.get("end").get 
    "ALERT Door Open" 
} 
val patternStreamSelected = patternStream.select(selectFn(_)).print() 
+0

差不多。這裏的問題是「A」是在前一個記錄中定義的,因此它是可變的。 –