您可以將要下載的URL定義爲輸入DataStream
,然後從MapFunction
下載文檔。以下代碼演示了這一點:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");
inputURLs.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
URL url = new URL(s);
InputStream is = url.openStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
StringBuilder builder = new StringBuilder();
String line;
try {
while ((line = bufferedReader.readLine()) != null) {
builder.append(line + "\n");
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
try {
bufferedReader.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
return builder.toString();
}
}).print();
env.execute("URL download job");
我運行示例代碼,但它只運行一次並讀取所有文件。然而Iit不是流式傳輸,我認爲它會在json文件中增加時進行contiune讀操作。 – zt1983811
爲此你必須使用'ContinuousFileMonitoringFunction'。流媒體本身並不意味着這份工作將運行無限長。這隻會發生,如果你有一個非有限的來源。但是在這種情況下,'env.fromElements'函數產生一個有限的流媒體源。一旦這個源碼到達它,程序就會終止。 –