2017-04-20 68 views

回答

1

是的,你可以用Flink做這種流處理。你從弗林克所需要的基本構建塊連接流和狀態的功能 - 在這裏是一個使用RichCoFlatMap一個例子:

import org.apache.flink.api.common.state.ValueState; 
import org.apache.flink.api.common.state.ValueStateDescriptor; 
import org.apache.flink.api.common.typeinfo.TypeHint; 
import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.configuration.Configuration; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; 
import org.apache.flink.util.Collector; 

public class Connect { 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     DataStream<Event> control = env.fromElements(
       new Event(17), 
       new Event(42)) 
       .keyBy("key"); 

     DataStream<Event> data = env.fromElements(
       new Event(2), 
       new Event(42), 
       new Event(6), 
       new Event(17), 
       new Event(8), 
       new Event(42) 
       ) 
       .keyBy("key"); 

     DataStream<Event> result = control 
       .connect(data) 
       .flatMap(new MyConnectedStreams()); 

     result.print(); 

     env.execute(); 
    } 

    static final class MyConnectedStreams 
      extends RichCoFlatMapFunction<Event, Event, Event> { 

     private ValueState<Boolean> seen = null; 

     @Override 
     public void open(Configuration config) { 
      ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
        // state name 
        "have-seen-key", 
        // type information of state 
        TypeInformation.of(new TypeHint<Boolean>() { 
        })); 
      seen = getRuntimeContext().getState(descriptor); 
     } 

     @Override 
     public void flatMap1(Event control, Collector<Event> out) throws Exception { 
      seen.update(Boolean.TRUE); 
     } 

     @Override 
     public void flatMap2(Event data, Collector<Event> out) throws Exception { 
      if (seen.value() == Boolean.TRUE) { 
       out.collect(data); 
      } 
     } 
    } 


    public static final class Event { 
     public Event() { 
     } 

     public Event(int key) { 
      this.key = key; 
     } 

     public int key; 

     public String toString() { 
      return String.valueOf(key); 
     } 
    } 
} 

在這個例子中,只有那些被視爲對控制流的密鑰是通過傳遞數據流 - 所有其他事件都被過濾掉。我利用了Flink's managed keyed stateconnected streams

爲了保持這種簡單,我忽略了您的要求,即數據流具有JSON,但您可以找到如何在其他地方使用JSON和Flink的示例。

請注意,您的結果將是非確定性的,因爲您無法控制兩個流相對於另一個的時間。您可以通過將事件時間時間戳添加到流中,然後使用RichCoProcessFunction來進行管理。