我有一個應用程序可以抓取大約六千個URL。爲了最大限度地減少這項工作,我創建了一個RecursiveTask,它使用所有要爬網的URL的ConcurrentLinkedQueue。它最多可以關閉50個,如果這個Que是空的,它會直接抓取它,但是如果沒有,它會首先創建一個新的自己的實例並將其分叉,之後它會抓取50的子集,然後它將加入分叉的任務。ForkJoinFramework只使用兩名工作人員
現在出現我的問題,直到每個線程同時工作了他的50個全部四個工作快速anf。但是,在兩次停止工作並等待加入之後,只有另外兩個人正在工作並創建新的分支和抓取頁面。
爲了可視化這個數據,我計算了一個線程爬行URL的數量,並讓JavaFX GUI顯示它。
我錯了什麼,所以ForkJoinFramewok只使用我的四個允許線程中的兩個?我能做些什麼來改變它?
這裏是任務的我的計算方法:
LOG.debug(
Thread.currentThread().getId() + " Starting new Task with "
+ urlsToCrawl.size() + " left."
);
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
{
urlsToCrawlSubset.offer(urlsToCrawl.poll());
}
LOG.debug(
Thread.currentThread().getId() + " Crated a Subset with "
+ urlsToCrawlSubset.size() + "."
);
LOG.debug(
Thread.currentThread().getId()
+ " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
);
if (urlsToCrawl.isEmpty())
{
LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
crawlPage(urlsToCrawlSubset);
}
else
{
LOG.debug(
Thread.currentThread().getId()
+ " Creating a new Task and crawling the subset."
);
final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
otherTask.fork();
crawlPage(urlsToCrawlSubset);
taskResults.addAll(otherTask.join());
}
return taskResults;
附:如果我允許多達80個線程,我們將使用它們,直到每個網站都有50個網址被抓取,然後只使用兩個。
如果你有興趣,這裏是完整的源代碼:https://github.com/mediathekview/MServer/tree/feature/cleanup
你確定這無疑是正確的otherTask.join()那裏調用? – algrid
我無法通過github中的代碼山。如果您需要幫助,請創建一個sscc示例。 http://sscce.org/另外請注意,join()阻塞高達50%的線程,如此處所述:http://coopsoft.com/ar/Calamity2Article.html#join – edharned
您可以顯示提交任務的代碼去游泳池? –