我建立了一個基本的網頁解析器,它使用hadoop來把URL傳遞給多個線程。這很好地工作,直到我到達輸入文件的末尾,Hadoop會在線程仍在運行時聲明自己已完成。這會導致org.apache.hadoop.fs.FSError錯誤:java.io.IOException:Stream Closed。無論如何要保持這個流線足夠長的時間來完成線程嗎? (我可以用合理的準確度預測線程在單個url上花費的最大時間量)。如何防止hadoop流關閉?
繼承人我是如何執行的線程
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private URLPile pile = new URLPile();
private MSLiteThread[] Threads = new MSLiteThread[16];
private boolean once = true;
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter) {
String url = value.toString();
StringTokenizer urls = new StringTokenizer(url);
Config.LoggerProvider = LoggerProvider.DISABLED;
System.out.println("In Mapper");
if (once) {
for (MSLiteThread thread : Threads) {
System.out.println("created thread");
thread = new MSLiteThread(pile);
thread.start();
}
once = false;
}
while (urls.hasMoreTokens()) {
try {
word.set(urls.nextToken());
String currenturl = word.toString();
pile.addUrl(currenturl, output);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
螺紋自己得到這樣
public void run(){
try {
sleep(3000);
while(!done()){
try {
System.out.println("in thread");
MSLiteURL tempURL = pile.getNextURL();
String currenturl = tempURL.getURL();
urlParser.parse(currenturl);
urlText.set("");
titleText.set(currenturl+urlParser.export());
System.out.println(urlText.toString()+titleText.toString());
tempURL.getOutput().collect(urlText, titleText);
pile.doneParsing();
sleep(30);
} catch (Exception e) {
pile.doneParsing();
e.printStackTrace();
continue;
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread done");
}
的網址,並在urlpile相關的方法是
public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException {
while(queue.size()>16){
System.out.println("queue full");
wait();
}
finishedParcing--;
queue.add(new MSLiteURL(output,url));
notifyAll();
}
private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>();
private int sent = 0;
private int finishedParcing = 0;
public synchronized MSLiteURL getNextURL() throws InterruptedException {
notifyAll();
sent++;
//System.out.println(queue.peek());
return queue.remove();
}
那dec如果它真的超時,這個任務就會失敗,這並不是我想要的,但它似乎是正確的。 – Chenab
啊!我可能錯過了問題中的一些細節。你是說你有從單個地圖任務運行的線程,並且當地圖完成處理它的輸入時,Hadoop退出了嗎? –
或多或少。線程一段時間來處理每個輸入,這就是爲什麼我有更多的一個。然而,一旦hadoop聲明地圖任務完成,線程就沒有放置輸出的地方。 – Chenab