2017-01-09 13 views
1

請參閱下面的代碼示例請告訴我有效的方法來調用HTTP請求和火花讀取的InputStream MapTask

JavaRDD<String> mapRDD = filteredRecords 
      .map(new Function<String, String>() { 

       @Override 
       public String call(String url) throws Exception { 
        BufferedReader in = null; 
        URL formatURL = new URL((url.replaceAll("\"", "")) 
          .trim()); 
        try { 
         HttpURLConnection con = (HttpURLConnection) formatURL 
           .openConnection(); 
         in = new BufferedReader(new InputStreamReader(con 
           .getInputStream())); 

         return in.readLine(); 
        } finally { 
         if (in != null) { 
          in.close(); 
         } 
        } 
       } 
      }); 

這裏的網址是HTTP GET請求。示例

http://ip:port/cyb/test?event=movie&id=604568837&name=SID&timestamp_secs=1460494800&timestamp_millis=1461729600000&back_up_id=676700166 

這段代碼非常慢。 IP和端口是隨機的,負載是分佈式的,所以ip可以有20個不同的端口值,所以我沒有看到瓶頸。

當我評論

in = new BufferedReader(new InputStreamReader(con 
          .getInputStream())); 

        return in.readLine(); 

的代碼是太快了。 注意:要處理的輸入數據是10GB。使用spark從S3讀取。

有什麼問題,我正在用BufferedReader或InputStreamReader做任何替代。 我不能在spark中使用foreach,因爲我必須從服務器獲取響應,並且需要將JAVARdd另存爲textFile在HDFS上。

如果我們使用mappartition代碼的東西如下

JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { 

     @Override 
     public Iterable<String> call(Iterator<String> tuple) throws Exception { 

      final List<String> rddList = new ArrayList<String>(); 
      Iterable<String> iterable = new Iterable<String>() { 

       @Override 
       public Iterator<String> iterator() { 
        return rddList.iterator(); 
       } 
      }; 
      while(tuple.hasNext()) { 
       URL formatURL = new URL((tuple.next().replaceAll("\"", "")) 
         .trim()); 
       HttpURLConnection con = (HttpURLConnection) formatURL 
         .openConnection(); 
       try(BufferedReader br = new BufferedReader(new InputStreamReader(con 
         .getInputStream()))) { 

        rddList.add(br.readLine()); 

       } catch (IOException ex) { 
        return rddList; 
       } 
      } 
      return iterable; 
     } 
    }); 

這裏還爲每個記錄,我們都在做同樣的..是不是?

回答

1

你目前正在使用

地圖功能

其中創建分區中的每一行的URL請求。

您可以使用

mapPartition

這將使得代碼的運行速度,因爲它創建連接到服務器只有一次,那就是每個分區只有一個連接。

+0

是的聽起來不錯..讓我試試這個。謝謝 –

+0

我在這裏看到一個問題更新了代碼塊 –

0

設置TCP/HTTPS連接的代價很大。即使你只是閱讀大文件的第一行(短),爲了更好地重用HTTP/1.1連接,現代HTTP客戶端也會嘗試將read()讀到文件末尾,所以避免中止連接。對於小文件來說這是一個很好的策略,但對於MB文件來說則不然。

有一個解決方案:在讀取上設置內容長度,以便只讀取較小的塊,從而降低close();的成本。連接回收再降低HTTPS設置成本。這是最新的Hadoop/Spark S3A客戶端所做的,如果您在連接上設置fadvise = random:請求塊而不是整個多GB文件。但請注意:如果您要逐字節地通過文件,設計實際上是非常糟糕的...

相關問題