打印出來我有下面這樣的代碼:沒有正在從弗林克圖案化流
import java.util.Properties
import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WindowedWordCount {
val configFactory = ConfigFactory.load()
def main(args: Array[String]) = {
val brokers = configFactory.getString("kafka.broker")
val topicChannel1 = configFactory.getString("kafka.topic1")
val props = new Properties()
...
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val priceCheck = Pattern.begin[String]("start").where({jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account").toString == "iOS"})
val pattern = CEP.pattern(partitionedInput, priceCheck)
val newStream = pattern.select(x =>
x.get("start").map({str =>
str
})
)
newStream.print()
env.execute()
}
}
出於某種原因在於newStream.print()
沒有上面的代碼被打印出來。我肯定卡夫卡的數據與我在上面定義的模式相匹配,但由於某種原因沒有任何內容正在打印出來。
任何人都可以幫我找出這段代碼中的錯誤嗎?
編輯:
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
val jsonParser = new JsonParser()
val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis())
}
}
我的弗林克文件,讓您可以在extractTimestamp
方法在getCurrentWatermark
方法只返回prevElementTimestamp
(如果您正在使用Kafka010)和new Watermark(System.currentTimeMillis)
看到。
但我不明白什麼prevElementTimestamp
是或爲什麼會返回new Watermark(System.currentTimeMillis)
作爲WaterMark而不是別的。請你詳細說明我們爲什麼要這樣做WaterMark
和EventTime
請一起工作?
更清晰的信息我已經添加了一個編輯我的帖子。你可以看看嗎? – CapturedTree