1
我想加入來自Kafka生產者的兩個流(json)。 如果我篩選數據,代碼將起作用。但是當我加入他們時似乎不起作用。我想打印到控制檯的聯合流,但沒有出現。 這是我的代碼兩個Streams之間的Flink Scala連接似乎不起作用
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object App {
def main(args : Array[String]) {
case class Data(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt)
case class Datas(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt)
val properties = new Properties();
properties.setProperty("bootstrap.servers", "0.0.0.0:9092");
properties.setProperty("group.id", "test");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
val stream1 = env
.addSource(consumer1)
val consumer2 = new FlinkKafkaConsumer010[String]("topics2", new SimpleStringSchema(), properties)
val stream2 = env
.addSource(consumer2)
val s1 = stream1.map { x => {
implicit val formats = DefaultFormats
JsonMethods.parse(x).extract[Sensor]
}
}
val s2 = stream2.map { x => {
implicit val formats = DefaultFormats
JsonMethods.parse(x).extract[Sensor2]
}
}
val s1t = s1.assignAscendingTimestamps { x => x.data.timestamp }
val s2t = s2.assignAscendingTimestamps { x => x.data.timestamp }
val j1pre = s1t.join(s2t)
.where(_.data.unit)
.equalTo(_.data.unit)
.window(TumblingEventTimeWindows.of(Time.seconds(2L)))
.apply((g, s) => (s.sensor_name, g.sensor_name, s.data.measurement))
env.execute()
}
}
我認爲這個問題是在時間戳的分配。我認爲assignAscendingTimestamp
這兩個來源不是正確的功能。
由kafka生產者生產的json有一個字段data.timestamp
,應該將其分配爲時間戳。但我不知道如何管理它。
我還認爲我應該給傳入的元組提供一個時間窗口批處理(如火花)。但我不確定這是否是正確的解決方案。
謝謝。現在它的作品!我添加了'TimeCharacteristic',現在一切都很好! –