爬蟲有一個urlQueue來記錄要抓取的url,一個模擬異步url fetcher。
我試圖用rx-java風格編寫它。 起初,我嘗試Flowable.generate這樣如何重寫以下rx-java爬蟲
Flowable.generate((Consumer<Emitter<Integer>>) e -> {
final Integer poll = demo.urlQueue.poll();
if (poll != null) {
e.onNext(poll);
} else if (runningCount.get() == 0) {
e.onComplete();
}
}).flatMap(i -> {
runningCount.incrementAndGet();
return demo.urlFetcher.asyncFetchUrl(i);
}, 10)
.doOnNext(page -> demo.onSuccess(page))
.subscribe(page -> runningCount.decrementAndGet());
,但它不會工作,因爲在開始時,有可能只有一個urlQueue種子,所以產生被稱爲10次,但只有一個即onNext被髮射。只有當它完成時,纔會調用下一個請求(1) - >生成。
儘管在代碼中,我們指定flatMap maxConcurrency爲10,它會逐一抓取。
之後,我修改下面的代碼,它可以像預期的那樣工作。
但是在代碼中,我應該關心當前有多少任務正在運行,然後計算應該從隊列中提取多少個任務,我認爲rx-java應該完成這項工作。
我不確定代碼是否可以用更簡單的方法重寫。
public class CrawlerDemo {
private static Logger logger = LoggerFactory.getLogger(CrawlerDemo.class);
// it can be redis queue or other queue
private BlockingQueue<Integer> urlQueue = new LinkedBlockingQueue<>();
private static AtomicInteger runningCount = new AtomicInteger(0);
private static final int MAX_CONCURRENCY = 5;
private UrlFetcher urlFetcher = new UrlFetcher();
private void addSeed(int i) {
urlQueue.offer(i);
}
private void onSuccess(Page page) {
page.links.forEach(i -> {
logger.info("offer more url " + i);
urlQueue.offer(i);
});
}
private void start(BehaviorProcessor processor) {
final Integer poll = urlQueue.poll();
if (poll != null) {
processor.onNext(poll);
} else {
processor.onComplete();
}
}
private int dispatchMoreLink(BehaviorProcessor processor) {
int links = 0;
while (runningCount.get() <= MAX_CONCURRENCY) {
final Integer poll = urlQueue.poll();
if (poll != null) {
processor.onNext(poll);
links++;
} else {
if (runningCount.get() == 0) {
processor.onComplete();
}
break;
}
}
return links;
}
private Flowable<Page> asyncFetchUrl(int i) {
return urlFetcher.asyncFetchUrl(i);
}
public static void main(String[] args) throws InterruptedException {
CrawlerDemo demo = new CrawlerDemo();
demo.addSeed(1);
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
processor
.flatMap(i -> {
runningCount.incrementAndGet();
return demo.asyncFetchUrl(i)
.doFinally(() -> runningCount.decrementAndGet())
.doFinally(() -> demo.dispatchMoreLink(processor));
}, MAX_CONCURRENCY)
.doOnNext(page -> demo.onSuccess(page))
.subscribe();
demo.start(processor);
}
}
class Page {
public List<Integer> links = new ArrayList<>();
}
class UrlFetcher {
static Logger logger = LoggerFactory.getLogger(UrlFetcher.class);
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public Flowable<Page> asyncFetchUrl(Integer url) {
logger.info("start async get " + url);
return Flowable.defer(() -> emitter ->
scheduledExecutorService.schedule(() -> {
Page page = new Page();
// the website urls no more than 1000
if (url < 1000) {
page.links = IntStream.range(1, 5).boxed().map(j -> 10 * url + j).collect(Collectors.toList());
}
logger.info("finish async get " + url);
emitter.onNext(page);
emitter.onComplete();
}, 5, TimeUnit.SECONDS)); // cost 5 seconds to access url
}
}