1
我的問題是,如果我們有兩個原始事件流,即煙霧和溫度,我們要找出是否複雜的事件即消防已經運用運營商的原始數據流發生的事情,我們可以做到這一點在弗林克?是否有可能在apache flink CEP中處理多個流?
我在問這個問題,因爲我目前爲Flink CEP看過的所有例子都只包含一個輸入流。如果我錯了,請糾正我。
我的問題是,如果我們有兩個原始事件流,即煙霧和溫度,我們要找出是否複雜的事件即消防已經運用運營商的原始數據流發生的事情,我們可以做到這一點在弗林克?是否有可能在apache flink CEP中處理多個流?
我在問這個問題,因爲我目前爲Flink CEP看過的所有例子都只包含一個輸入流。如果我錯了,請糾正我。
簡答 - 是的,您可以根據來自不同流源的事件類型讀取和處理多個流並激發規則。
長答案 - 我有一個類似的要求,我的回答是基於您正在閱讀不同的kafka話題的不同流的假設。
閱讀從單一源流不同的事件不同的主題:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
串行器讀取數據,並將其解析到一個有一個共同的格式 - 對於如。
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
,並在這之後,事情很簡單,定義基於事件名稱的規則和定義的規則比較的事件名稱(您也可以定義複雜的規則如下):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
我希望這可以讓您將一個或多個不同的流集成在一起。