請參閱下面的代碼示例請告訴我有效的方法來調用HTTP請求和火花讀取的InputStream MapTask
JavaRDD<String> mapRDD = filteredRecords
.map(new Function<String, String>() {
@Override
public String call(String url) throws Exception {
BufferedReader in = null;
URL formatURL = new URL((url.replaceAll("\"", ""))
.trim());
try {
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
} finally {
if (in != null) {
in.close();
}
}
}
});
這裏的網址是HTTP GET請求。示例
http://ip:port/cyb/test?event=movie&id=604568837&name=SID×tamp_secs=1460494800×tamp_millis=1461729600000&back_up_id=676700166
這段代碼非常慢。 IP和端口是隨機的,負載是分佈式的,所以ip可以有20個不同的端口值,所以我沒有看到瓶頸。
當我評論
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
的代碼是太快了。 注意:要處理的輸入數據是10GB。使用spark從S3讀取。
有什麼問題,我正在用BufferedReader或InputStreamReader做任何替代。 我不能在spark中使用foreach,因爲我必須從服務器獲取響應,並且需要將JAVARdd另存爲textFile在HDFS上。
如果我們使用mappartition代碼的東西如下
JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> tuple) throws Exception {
final List<String> rddList = new ArrayList<String>();
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return rddList.iterator();
}
};
while(tuple.hasNext()) {
URL formatURL = new URL((tuple.next().replaceAll("\"", ""))
.trim());
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
try(BufferedReader br = new BufferedReader(new InputStreamReader(con
.getInputStream()))) {
rddList.add(br.readLine());
} catch (IOException ex) {
return rddList;
}
}
return iterable;
}
});
這裏還爲每個記錄,我們都在做同樣的..是不是?
是的聽起來不錯..讓我試試這個。謝謝 –
我在這裏看到一個問題更新了代碼塊 –