2016-12-07 34 views
0

爬蟲有一個urlQueue來記錄要抓取的url,一個模擬異步url fetcher。
我試圖用rx-java風格編寫它。 起初,我嘗試Flowable.generate這樣如何重寫以下rx-java爬蟲

Flowable.generate((Consumer<Emitter<Integer>>) e -> { 
     final Integer poll = demo.urlQueue.poll(); 
     if (poll != null) { 
      e.onNext(poll); 
     } else if (runningCount.get() == 0) { 
      e.onComplete(); 
     } 
    }).flatMap(i -> { 
     runningCount.incrementAndGet(); 
     return demo.urlFetcher.asyncFetchUrl(i); 
    }, 10) 
      .doOnNext(page -> demo.onSuccess(page)) 
      .subscribe(page -> runningCount.decrementAndGet()); 

,但它不會工作,因爲在開始時,有可能只有一個urlQueue種子,所以產生被稱爲10次,但只有一個即onNext被髮射。只有當它完成時,纔會調用下一個請求(1) - >生成。
儘管在代碼中,我們指定flatMap maxConcurrency爲10,它會逐一抓取。

之後,我修改下面的代碼,它可以像預期的那樣工作。
但是在代碼中,我應該關心當前有多少任務正在運行,然後計算應該從隊列中提取多少個任務,我認爲rx-java應該完成這項工作。

我不確定代碼是否可以用更簡單的方法重寫。

public class CrawlerDemo { 
    private static Logger logger = LoggerFactory.getLogger(CrawlerDemo.class); 

    // it can be redis queue or other queue 
    private BlockingQueue<Integer> urlQueue = new LinkedBlockingQueue<>(); 

    private static AtomicInteger runningCount = new AtomicInteger(0); 

    private static final int MAX_CONCURRENCY = 5; 

    private UrlFetcher urlFetcher = new UrlFetcher(); 

    private void addSeed(int i) { 
     urlQueue.offer(i); 
    } 

    private void onSuccess(Page page) { 
     page.links.forEach(i -> { 
      logger.info("offer more url " + i); 
      urlQueue.offer(i); 
     }); 
    } 

    private void start(BehaviorProcessor processor) { 
     final Integer poll = urlQueue.poll(); 
     if (poll != null) { 
      processor.onNext(poll); 

     } else { 
      processor.onComplete(); 
     } 
    } 

    private int dispatchMoreLink(BehaviorProcessor processor) { 

     int links = 0; 
     while (runningCount.get() <= MAX_CONCURRENCY) { 
      final Integer poll = urlQueue.poll(); 
      if (poll != null) { 
       processor.onNext(poll); 

       links++; 
      } else { 
       if (runningCount.get() == 0) { 
        processor.onComplete(); 
       } 
       break; 
      } 
     } 

     return links; 
    } 

    private Flowable<Page> asyncFetchUrl(int i) { 
     return urlFetcher.asyncFetchUrl(i); 
    } 


    public static void main(String[] args) throws InterruptedException { 
     CrawlerDemo demo = new CrawlerDemo(); 
     demo.addSeed(1); 

     BehaviorProcessor<Integer> processor = BehaviorProcessor.create(); 

     processor 
       .flatMap(i -> { 
        runningCount.incrementAndGet(); 
        return demo.asyncFetchUrl(i) 
          .doFinally(() -> runningCount.decrementAndGet()) 
          .doFinally(() -> demo.dispatchMoreLink(processor)); 
       }, MAX_CONCURRENCY) 
       .doOnNext(page -> demo.onSuccess(page)) 
       .subscribe(); 

     demo.start(processor); 


    } 


} 

class Page { 
    public List<Integer> links = new ArrayList<>(); 
} 

class UrlFetcher { 
    static Logger logger = LoggerFactory.getLogger(UrlFetcher.class); 


    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); 

    public Flowable<Page> asyncFetchUrl(Integer url) { 

     logger.info("start async get " + url); 
     return Flowable.defer(() -> emitter -> 
       scheduledExecutorService.schedule(() -> { 

        Page page = new Page(); 
        // the website urls no more than 1000 
        if (url < 1000) { 
         page.links = IntStream.range(1, 5).boxed().map(j -> 10 * url + j).collect(Collectors.toList()); 
        } 

        logger.info("finish async get " + url); 
        emitter.onNext(page); 
        emitter.onComplete(); 
       }, 5, TimeUnit.SECONDS));         // cost 5 seconds to access url 
    } 
} 

回答

0

您試圖對RxJava使用常規(非Rx)代碼,但沒有得到您想要的結果。

要做的第一件事就是到urlQueue.poll()轉換成Flowable<Integer>

Flowable.generate((Consumer<Emitter<Integer>>) e -> { 
    final Integer take = demo.urlQueue.take(); // Note 1 
    e.onNext(take); // Note 2 
}) 
    .observeOn(Schedulers.io(), 1) // Note 3 
    .flatMap(i -> demo.urlFetcher.asyncFetchUrl(i), 10) 
    .subscribe(page -> demo.onSuccess(page)); 
  1. 閱讀的反應方式的隊列意味着阻塞等待。嘗試poll()隊列會增加一層RxJava允許您跳過的複雜性。
  2. 將收到的值傳遞給任何訂戶。如果您需要指示完成,則需要添加外部布爾值,或使用帶內指示符(例如負整數)。
  3. observeOn()運營商將訂閱發電機。價值1只會導致一個訂閱,因爲沒有多個訂閱點。

其餘代碼與您擁有的相似。您產生的問題是因爲flatMap(...,10)操作會向發生器訂閱10次,這不是您想要的。你想限制同時抓取的次數。添加runningCount是一種混亂,可以防止早期退出發生器,但它不能取代urlQueue上用於發送數據結束的正確方法。