flink-streaming

    0熱度

    1回答

    我想在基於歷史事件的流中計算Flink中基於窗口的平均值(或由我定義的任何其他函數),因此流必須是事件時間(不處理基於時間): val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteri

    1熱度

    1回答

    當我嘗試使用窗口和摺疊功能聚合元素時,元素的某些 從獲取聚合中錯過。使用來自卡夫卡(value:0, value:1, value:2, value:3)的元素 ,並將它們合併爲奇數和偶數值 。 輸出爲:10-13之間 {even=[0, 2, 4], odd=[1, 3]} {even=[6, 8], odd=[5, 7, 9]} {even=[14, 16, 18], odd=[15, 1

    1熱度

    1回答

    我是Flink的新手。我有這個代碼來映射,組和輸入JSON的總和。 這與詞計數示例非常相似。 我有望獲得(vacant,1) (occupied,2) 但是,由於某種原因,我得到(occupied,1) (vacant,1) (occupied,2) public static void main(String[] args) throws Exception { String s =

    0熱度

    1回答

    我們計劃使用Flink處理來自kafka主題的數據流(Logs in Json格式)。 但是,對於那個處理,我們需要使用每天都在變化的輸入文件,而內部的信息可以完全改變(不是格式,而是內容)。 每當其中一個輸入文件發生更改時,我們將不得不將這些文件重新加載到程序中並保持流處理正在進行。數據 重新加載可以做同樣的方式,因爲它現在已經完成: DataSet<String> globalData = e

    1熱度

    1回答

    我正在調查Apache Flink如何工作並試圖瞭解Flink中的時間窗口。

    1熱度

    1回答

    我想提取由FlinkKafkaConsumer010生成的消息的時間戳作爲數據流中的值。 我知道AssignerWithPeriodicWatermarks類,但這似乎只是通過DataStream API爲時間聚合的目的提取時間戳。 我想在後面的Table中提供該卡夫卡消息時間戳,我可以在其上使用SQL。 編輯:嘗試這樣: val consumer = new FlinkKafkaConsumer

    0熱度

    2回答

    引起弗林克儀表盤版本1.3.2 CEP模式我已經寫了這樣 Pattern<JoinedEvent, ?> pattern = Pattern.<JoinedEvent>begin("start") .where(new SimpleCondition<JoinedEvent>() { @Override public boolean filter(JoinedEv

    0熱度

    1回答

    我正在做一個Flink項目。該項目的主要思想是讀取JSON(網絡日誌)的數據流,將它們關聯起來,並生成一個新的JSON,它是不同JSON信息的組合。 此時,我可以讀取JSON,生成一個KeyedStream(基於生成日誌的機器),然後生成一個5秒的窗口流。 我想要執行的下一步是將apply函數應用到窗口併合並每個JSON的信息。我對如何去做有點困惑。 我現在有該代碼是以下各項之一: DataStr

    2熱度

    1回答

    我正在評估Flink專門爲流式窗口支持生成可能的警報。我關心的是內存使用情況,所以如果有人可以幫助,這將不勝感激。 例如,該應用程序將在給定的翻滾窗口(比如說5分鐘)內從該流潛在地消耗大量的數據。在評估的時候,如果說有一百萬份符合條件的文檔,它們是否都會被加載到內存中? 的一般流程是: producer -> kafka -> flinkkafkaconsumer -> table.window(

    2熱度

    2回答

    我最近嘗試從Flink 1.3.2升級到1.4.0,我遇到了一些問題,無法再導入org.apache.hadoop.fs.{FileSystem, Path}。問題是發生在兩個地方: ParquetWriter: import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apach