2017-05-03 71 views
3

弗林克版本弗林克流預測:1.2.0
斯卡拉版本:2.11.8實時使用階

我想用數據流中弗林克使用模型使用Scala的預測。 我有一個使用scala的flink中的DataStream [String],它包含來自kafka source的json格式的數據。我想用這個數據流來預測已經訓練過的Flink-ml模型。 問題是所有的flink-ml示例都使用DataSet API來預測。 我對flink和scala相對來說比較新,所以任何幫助代碼解決方案的形式,將不勝感激。

輸入:

{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"} 

代碼:

package org.apache.flink.quickstart 

//imports 

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.ml.recommendation.ALS 
import org.apache.flink.ml.regression.MultipleLinearRegression 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 

import scala.util.parsing.json.JSON 

//kafka consumer imports 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 

//kafka json table imports 
import org.apache.flink.table.examples.scala.StreamTableExample 
import org.apache.flink.table.api.TableEnvironment 
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource 
import org.apache.flink.api.java.DataSet 

//JSon4s imports 
import org.json4s.native.JsonMethods 



// Case class 
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String) 


object WordCount { 

    implicit val formats = org.json4s.DefaultFormats 

    def main(args: Array[String]) { 

    // set up the execution environment 
    implicit lazy val formats = org.json4s.DefaultFormats 

    // kafka properties 
    val properties = new Properties() 
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093") 
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181") 
    properties.setProperty("group.id","grouop") 
    properties.setProperty("auto.offset.reset", "earliest") 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
// val tableEnv = TableEnvironment.getTableEnvironment(env) 

    val st = env 
     .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties)) 
     .flatMap(raw => JsonMethods.parse(raw).toOption) 


    val mapped = st.map(_.extract[CC]) 

    mapped.print() 

    env.execute() 

    } 
} 

回答

1

就可以解決這個問題是方式是寫一個MapFunction其內容在工作開始時的模式。然後MapFunction將存儲該模型作爲其內部狀態的一部分。這樣它將在發生故障時自動恢復:

public static void main(String[] args) throws Exception { 
     // obtain execution environment, run this example in "ingestion time" 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     DataStream<Value> input = ...; // read from Kafka for example 

     DataStream<Prediction> prediction = input.map(new Predictor()); 

     prediction.print(); 

     env.execute(); 
    } 

    public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction { 

     private transient ListState<Model> modelState; 

     private transient Model model; 

     @Override 
     public Prediction map(Value value) throws Exception { 
      return model.predict(value); 
     } 

     @Override 
     public void snapshotState(FunctionSnapshotContext context) throws Exception { 
      // we don't have to do anything here because we assume the model to be constant 
     } 

     @Override 
     public void initializeState(FunctionInitializationContext context) throws Exception { 
      ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class); 

      modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor); 

      if (context.isRestored()) { 
       // restore the model from state 
       model = modelState.get().iterator().next(); 
      } else { 
       modelState.clear(); 

       // read the model from somewhere, e.g. read from a file 
       model = ...; 

       // update the modelState so that it is checkpointed from now 
       modelState.add(model); 
      } 
     } 
    } 

    public static class Model {} 

    public static class Value{} 

    public static class Prediction{} 
}