2017-09-15 144 views
0

我有一個應用程序可以抓取大約六千個URL。爲了最大限度地減少這項工作,我創建了一個RecursiveTask,它使用所有要爬網的URL的ConcurrentLinkedQueue。它最多可以關閉50個,如果這個Que是空的,它會直接抓取它,但是如果沒有,它會首先創建一個新的自己的實例並將其分叉,之後它會抓取50的子集,然後它將加入分叉的任務。ForkJoinFramework只使用兩名工作人員

現在出現我的問題,直到每個線程同時工作了他的50個全部四個工作快速anf。但是,在兩次停止工作並等待加入之後,只有另外兩個人正在工作並創建新的分支和抓取頁面。

爲了可視化這個數據,我計算了一個線程爬行URL的數量,並讓JavaFX GUI顯示它。

我錯了什麼,所以ForkJoinFramewok只使用我的四個允許線程中的兩個?我能做些什麼來改變它?

這裏是任務的我的計算方法:

LOG.debug(
     Thread.currentThread().getId() + " Starting new Task with " 
      + urlsToCrawl.size() + " left." 
    ); 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++) 
    { 
     urlsToCrawlSubset.offer(urlsToCrawl.poll()); 
    } 
    LOG.debug(
     Thread.currentThread().getId() + " Crated a Subset with " 
     + urlsToCrawlSubset.size() + "." 
    ); 
    LOG.debug(
     Thread.currentThread().getId() 
     + " Now the Urls to crawl only left " + urlsToCrawl.size() + "." 
    ); 

    if (urlsToCrawl.isEmpty()) 
    { 
     LOG.debug(Thread.currentThread().getId() + " Crawling the subset."); 
     crawlPage(urlsToCrawlSubset); 
    } 
    else 
    { 
     LOG.debug(
      Thread.currentThread().getId() 
       + " Creating a new Task and crawling the subset." 
     ); 
     final AbstractUrlTask<T, D> otherTask = createNewOwnInstance(); 
     otherTask.fork(); 
     crawlPage(urlsToCrawlSubset); 
     taskResults.addAll(otherTask.join()); 
    } 
    return taskResults; 

這裏是我的圖的快照: enter image description here

附:如果我允許多達80個線程,我們將使用它們,直到每個網站都有50個網址被抓取,然後只使用兩個。

如果你有興趣,這裏是完整的源代碼:https://github.com/mediathekview/MServer/tree/feature/cleanup

+1

你確定這無疑是正確的otherTask.join()那裏調用? – algrid

+1

我無法通過github中的代碼山。如果您需要幫助,請創建一個sscc示例。 http://sscce.org/另外請注意,join()阻塞高達50%的線程,如此處所述:http://coopsoft.com/ar/Calamity2Article.html#join – edharned

+0

您可以顯示提交任務的代碼去游泳池? –

回答

0

我固定它。我的錯誤是,我分裂然後工作了一個小protion,而不是等待,而不是分成一半,然後再打電話給我自己與其他一半等

換句話說,之前我分裂和直接工作,但正確是分裂,直到所有分裂,然後開始工作。

這裏是我的代碼現在的樣子:

@Override 
protected Set<T> compute() 
{ 
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask()) 
    { 
     crawlPage(urlsToCrawl); 
    } 
    else 
    { 
     final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl)); 
     final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl); 
     leftTask.fork(); 
     taskResults.addAll(rightTask.compute()); 
     taskResults.addAll(leftTask.join()); 
    } 
    return taskResults; 
} 

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue) 
{ 
    final int halfSize = aBaseQueue.size()/2; 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < halfSize; i++) 
    { 
     urlsToCrawlSubset.offer(aBaseQueue.poll()); 
    } 
    return urlsToCrawlSubset; 
}