2
當我們試圖使用方法writeAsCsv(路徑,寫入模式)將來自Flink的已處理JSON數據寫入文件時,寫入文件但我們需要的數據在沒有發生的每個JSON之後插入逗號。我們使用Apache Kafka作爲Flink的數據源。將數據以JSON的形式從Flink寫入文件
DataStream<Tuple5<String, String, String, String, String>> messageStream = env.addSource(new FlinkKafkaConsumer08<>(FLINK_TOPIC, new SimpleStringSchema(), properties)).flatMap(new StreamToTuple5()).keyBy(0);
String path = "/home/user/Documents/docs/csvfile/";
messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE);
輸出獲得上執行這種方法
{"temperSensorData":"28.489084691371364","temperSensorUnit":"celsius","timestamp":"1493270680759","timestamp2":"1493270680786","timestamp3":"1493270680787"}
{"temperSensorData":"28.489084691371467","temperSensorUnit":"celsius","timestamp":"1493270680761","timestamp2":"1493270680816","timestamp3":"1493270680816"}