2015-11-02 50 views
1

我在Java中編寫了一個Flink流作業,它加載包含訂戶數據(4列)的csv文件,然後在與訂戶數據匹配時從套接字流中讀取數據。加載大文件時Flink作業在提交時掛起

起初我是用一個小的csv文件(8 MB)和一切工作正常:

# flink run analytics-flink.jar 19001 /root/minisubs.csv /root/output.csv 
loaded 200000 subscribers from csv file 
11/02/2015 16:36:59 Job execution switched to status RUNNING. 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to SCHEDULED 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to DEPLOYING 
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream Sink(1/1) switched to RUNNING 

我切換csv文件到一個更大的(〜45 MB),現在我看到的是這樣的:

# flink run analytics-flink.jar 19001 /root/subs.csv /root/output.csv 
loaded 1173547 subscribers from csv file 

請注意,上面的訂閱者數量是文件中的行數。我試圖在Flink配置中查找任何超時,但我找不到任何超時。

任何幫助,非常感謝!

編輯:CSV是加載利用公共-CSV 1.2庫使用這種方法:

private static HashMap<String, String> loadSubscriberGroups(
      String referenceDataFile) throws IOException { 
     HashMap<String,String> subscriberGroups = new HashMap<String, String>(); 

     File csvData = new File(referenceDataFile); 
     CSVParser parser = CSVParser.parse(csvData, Charset.defaultCharset(), CSVFormat.EXCEL); 
     for (CSVRecord csvRecord : parser) { 
      String imsi = csvRecord.get(0); 
      String groupStr = csvRecord.get(3); 

      if(groupStr == null || groupStr.isEmpty()) { 
       continue; 
      } 
      subscriberGroups.put(imsi, groupStr); 
     } 

     return subscriberGroups; 
    } 

和這裏的文件(我知道有在最後一個逗號的樣本,最後一欄是空的現在):

450000000000001,450000000001,7752,Tier-2, 
450000000000002,450000000002,1112,Tier-1, 
450000000000003,450000000003,6058,Tier-2, 
+0

加載CSV文件究竟該怎麼做?你能否提供一個讀取CSV文件的程序片段? –

+0

編輯後添加csv加載方法 –

+0

感謝您的更新。使用CSV數據做什麼?你如何將它注入Flink程序? –

回答

4

羅伯特Meztger(阿帕奇弗林克開發商):

我可以解釋爲什麼你的第一種方法沒有活像k:

您試圖使用我們的RPC系統(Akka)將來自Flink客戶端的CSV文件發送到 羣集。當您向Flink提交作業時,我們將用戶創建的所有對象(映射器,來源,...) 序列化並將其發送到羣集。有一種方法 StreamExecutionEnvironment.fromElements(..),它允許用戶連同作業提交序列化幾個對象。但是您可以像這樣傳輸的數據量 受Akka幀大小的限制。 在我們的案例中,我認爲默認值是10兆字節。之後,Akka 可能只是放棄或拒絕部署消息。

解決方法是使用富運算符而不是常規運算符(例如RichMapFunction而不是MapFunction),覆蓋open()方法並在該方法內加載CSV文件。

謝謝Robert!

相關問題