2017-04-27 116 views
2

當我們試圖使用方法writeAsCsv(路徑,寫入模式)將來自Flink的已處理JSON數據寫入文件時,寫入文件但我們需要的數據在沒有發生的每個JSON之後插入逗號。我們使用Apache Kafka作爲Flink的數據源。將數據以JSON的形式從Flink寫入文件

DataStream<Tuple5<String, String, String, String, String>> messageStream = env.addSource(new FlinkKafkaConsumer08<>(FLINK_TOPIC, new SimpleStringSchema(), properties)).flatMap(new StreamToTuple5()).keyBy(0); 
String path = "/home/user/Documents/docs/csvfile/"; 
messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE); 

輸出獲得上執行這種方法

{"temperSensorData":"28.489084691371364","temperSensorUnit":"celsius","timestamp":"1493270680759","timestamp2":"1493270680786","timestamp3":"1493270680787"} 
{"temperSensorData":"28.489084691371467","temperSensorUnit":"celsius","timestamp":"1493270680761","timestamp2":"1493270680816","timestamp3":"1493270680816"} 

回答

0

您可以通過指定','作爲行分隔符解決您的問題:

messageStream.writeAsCsv(path, FileSystem.WriteMode.OVERWRITE, ",", ",") 

這裏的第三個參數是行分隔符。