2016-02-28 53 views
7

在閱讀Apache Flink的幾個文檔頁面(official documentation,dataartisans)以及official repository中提供的示例之後,我會繼續看到它們用作數據源來流式處理文件的示例已經下載,總是連接到本地主機。使用Apache Flink從網頁獲取JSON元素

我正在嘗試使用Apache Flink下載包含動態數據的JSON文件。我的目的是試圖建立我可以訪問JSON文件作爲Apache Flink的輸入源的URL,而不是使用其他系統下載並使用Apache Flink處理下載的文件。

是否有可能建立與Apache Flink的這個網絡連接?

回答

4

您可以將要下載的URL定義爲輸入DataStream,然後從MapFunction下載文檔。以下代碼演示了這一點:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html"); 

inputURLs.map(new MapFunction<String, String>() { 
    @Override 
    public String map(String s) throws Exception { 
     URL url = new URL(s); 
     InputStream is = url.openStream(); 

     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); 

     StringBuilder builder = new StringBuilder(); 
     String line; 

     try { 
      while ((line = bufferedReader.readLine()) != null) { 
       builder.append(line + "\n"); 
      } 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     try { 
      bufferedReader.close(); 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     return builder.toString(); 
    } 
}).print(); 

env.execute("URL download job"); 
+0

我運行示例代碼,但它只運行一次並讀取所有文件。然而Iit不是流式傳輸,我認爲它會在json文件中增加時進行contiune讀操作。 – zt1983811

+0

爲此你必須使用'ContinuousFileMonitoringFunction'。流媒體本身並不意味着這份工作將運行無限長。這隻會發生,如果你有一個非有限的來源。但是在這種情況下,'env.fromElements'函數產生一個有限的流媒體源。一旦這個源碼到達它,程序就會終止。 –