2017-07-07 36 views
0

打印出來我有下面這樣的代碼:沒有正在從弗林克圖案化流

import java.util.Properties 

import com.google.gson._ 
import com.typesafe.config.ConfigFactory 
import org.apache.flink.cep.scala.pattern.Pattern 
import org.apache.flink.cep.scala.CEP 
import org.apache.flink.streaming.api.TimeCharacteristic 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 

object WindowedWordCount { 
    val configFactory = ConfigFactory.load() 
    def main(args: Array[String]) = { 
    val brokers = configFactory.getString("kafka.broker") 
    val topicChannel1 = configFactory.getString("kafka.topic1") 

    val props = new Properties() 
    ... 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props)) 

    val partitionedInput = dataStream.keyBy(jsonString => { 
     val jsonParser = new JsonParser() 
     val jsonObject = jsonParser.parse(jsonString).getAsJsonObject() 
     jsonObject.get("account") 
    }) 

    val priceCheck = Pattern.begin[String]("start").where({jsonString => 
     val jsonParser = new JsonParser() 
     val jsonObject = jsonParser.parse(jsonString).getAsJsonObject() 
     jsonObject.get("account").toString == "iOS"}) 

    val pattern = CEP.pattern(partitionedInput, priceCheck) 

    val newStream = pattern.select(x => 
     x.get("start").map({str => 
     str 
     }) 
    ) 

    newStream.print() 

    env.execute() 
    } 
} 

出於某種原因在於newStream.print()沒有上面的代碼被打印出來。我肯定卡夫卡的數據與我在上面定義的模式相匹配,但由於某種原因沒有任何內容正在打印出來。

任何人都可以幫我找出這段代碼中的錯誤嗎?

編輯:

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable { 

    override def extractTimestamp(e: String, prevElementTimestamp: Long) = { 
    val jsonParser = new JsonParser() 
    val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context") 
    Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli 
    } 

    override def getCurrentWatermark(): Watermark = { 
    new Watermark(System.currentTimeMillis()) 
    } 
} 

我的弗林克文件,讓您可以在extractTimestamp方法在getCurrentWatermark方法只返回prevElementTimestamp(如果您正在使用Kafka010)和new Watermark(System.currentTimeMillis)看到。

但我不明白什麼prevElementTimestamp是或爲什麼會返回new Watermark(System.currentTimeMillis)作爲WaterMark而不是別的。請你詳細說明我們爲什麼要這樣做WaterMarkEventTime請一起工作?

回答

2

你的設置工作在EventTime,但你不提供時間戳和水印提取。

有關在活動時間工作的更多信息,請參閱docs。如果你想使用卡夫卡嵌入時間戳,這docs可能會幫助你。

EventTime CEP庫在水印抵達時緩衝事件,以正確處理亂序事件。在你的情況下,沒有水印生成,所以事件緩衝無限。


編輯:

  1. prevElementTimestamp我認爲文檔是相當清楚的:

    沒有必要使用從卡夫卡的時間戳,當定義一個時間戳提取。 extractTimestamp()方法的previousElementTimestamp參數包含由Kafka消息攜帶的時間戳。

    由於Kafka 0.10.x卡夫卡消息可以嵌入時間戳。

  2. 在這種情況下生成Watermarknew Watermark(System.currentTimeMillis)不是一個好主意。根據您對數據的瞭解,您應該創建Watermark。關於我如何能WatermarkEventTime一起工作不會比docs

+0

更清晰的信息我已經添加了一個編輯我的帖子。你可以看看嗎? – CapturedTree