2017-08-03 87 views
1

我想加入來自Kafka生產者的兩個流(json)。 如果我篩選數據,代碼將起作用。但是當我加入他們時似乎不起作用。我想打印到控制檯的聯合流,但沒有出現。 這是我的代碼兩個Streams之間的Flink Scala連接似乎不起作用

import java.util.Properties 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods 
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 

object App { 

def main(args : Array[String]) { 

case class Data(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double) 
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt) 


case class Datas(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double) 
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt) 


val properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "0.0.0.0:9092"); 
    properties.setProperty("group.id", "test"); 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties) 
    val stream1 = env 
    .addSource(consumer1) 

    val consumer2 = new FlinkKafkaConsumer010[String]("topics2", new SimpleStringSchema(), properties) 
    val stream2 = env 
    .addSource(consumer2) 

    val s1 = stream1.map { x => { 
    implicit val formats = DefaultFormats 
    JsonMethods.parse(x).extract[Sensor] 
    } 
    } 
    val s2 = stream2.map { x => { 
    implicit val formats = DefaultFormats 
    JsonMethods.parse(x).extract[Sensor2] 
    } 
    } 

    val s1t = s1.assignAscendingTimestamps { x => x.data.timestamp } 
    val s2t = s2.assignAscendingTimestamps { x => x.data.timestamp } 

    val j1pre = s1t.join(s2t) 
       .where(_.data.unit) 
       .equalTo(_.data.unit) 
       .window(TumblingEventTimeWindows.of(Time.seconds(2L))) 
       .apply((g, s) => (s.sensor_name, g.sensor_name, s.data.measurement)) 
    env.execute() 

} 

} 

我認爲這個問題是在時間戳的分配。我認爲assignAscendingTimestamp這兩個來源不是正確的功能。

由kafka生產者生產的json有一個字段data.timestamp,應該將其分配爲時間戳。但我不知道如何管理它。

我還認爲我應該給傳入的元組提供一個時間窗口批處理(如火花)。但我不確定這是否是正確的解決方案。

回答

1

我認爲你的代碼只需要一些小的調整。只要你想在EventTime工作首先你應該設置適當的TimeCharacteristic

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

而且你的代碼,你貼缺少流水槽。如果你想打印到控制檯,你應該:

j1pre.print 

你的其他代碼看起來不錯。

+0

謝謝。現在它的作品!我添加了'TimeCharacteristic',現在一切都很好! –