Java 通用爬虫框架中多线程的使用

Java 通用爬虫框架中多线程的使用


一. 前言

NetDiscovery 是本人开发的一款基于 Vert.x、RxJava 2 等框架实现的通用爬虫框架。它包含了丰富的特性。

二. 多线程的使用

NetDiscovery 虽然借助了 RxJava 2 来实现线程的切换,仍然有大量使用多线程的场景。本文列举一些爬虫框架常见的多线程使用场景。

2.1 爬虫的暂停、恢复

暂停和恢复是最常见的爬虫使用场景,这里借助 CountDownLatch 类实现。

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。

暂停方法会初始化一个 CountDownLatch 类 pauseCountDown,并设置它的计数值为1。

恢复方法会执行 pauseCountDown 的 countDown() ,正好它的计数到达零。


  1. /**

  2. * 爬虫暂停,当前正在抓取的请求会继续抓取完成,之后的请求会等到resume的调用才继续抓取

  3. */

  4. public void pause() {

  5. this.pauseCountDown = new CountDownLatch(1);

  6. this.pause = true;

  7. stat.compareAndSet(SPIDER_STATUS_RUNNING, SPIDER_STATUS_PAUSE);

  8. }


  9. /**

  10. * 爬虫重新开始

  11. */

  12. public void resume() {


  13. if (stat.get() == SPIDER_STATUS_PAUSE

  14. && this.pauseCountDown!=null) {


  15. this.pauseCountDown.countDown();

  16. this.pause = false;

  17. stat.compareAndSet(SPIDER_STATUS_PAUSE, SPIDER_STATUS_RUNNING);

  18. }

  19. }

从消息队列中取出爬虫的 Request 时,会先判断是否需要暂停爬虫的行为,如果需要暂停则执行 pauseCountDown 的 await()。await() 会使线程一直受阻塞,也就是暂停爬虫的行为,直到 CountDownLatch 的计数为0,此时正好能够恢复爬虫运行的状态。


  1. while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {


  2. //暂停抓取

  3. if (pause && pauseCountDown!=null) {

  4. try {

  5. this.pauseCountDown.await();

  6. } catch (InterruptedException e) {

  7. log.error("can't pause : ", e);

  8. }


  9. initialDelay();

  10. }

  11. // 从消息队列中取出request

  12. final Request request = queue.poll(name);

  13. ......

  14. }

2.2 多纬度控制爬取速度

下图反映了单个爬虫的流程。

Java 通用爬虫框架中多线程的使用

如果爬虫爬取速度太快一定会被对方系统识别,NetDiscovery 可以通过限速来实现基本的反反爬虫。

在 NetDiscovery 内部支持多个纬度实现爬虫限速。这些纬度也基本上对应了单个爬虫的流程。

2.2.1 Request

首先,爬虫封装的请求 Request 支持暂停。从消息队列取出 Request 之后,会校验该 Request 是否需要暂停。


  1. while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {


  2. //暂停抓取

  3. ......


  4. // 从消息队列中取出request

  5. final Request request = queue.poll(name);


  6. if (request == null) {


  7. waitNewRequest();

  8. } else {


  9. if (request.getSleepTime() > 0) {


  10. try {

  11. Thread.sleep(request.getSleepTime());

  12. } catch (InterruptedException e) {

  13. e.printStackTrace();

  14. }

  15. }

  16. ......

  17. }

  18. }

2.2.2 Download

爬虫下载时,下载器会创建 RxJava 的 Maybe 对象。Download 的限速借助于 RxJava 的 compose、Transformer 来实现。

下面的代码展示了 DownloaderDelayTransformer:


  1. import cn.netdiscovery.core.domain.Request;

  2. import io.reactivex.Maybe;

  3. import io.reactivex.MaybeSource;

  4. import io.reactivex.MaybeTransformer;


  5. import java.util.concurrent.TimeUnit;


  6. /**

  7. * Created by tony on 2019-04-26.

  8. */

  9. public class DownloaderDelayTransformer implements MaybeTransformer {


  10. private Request request;


  11. public DownloaderDelayTransformer(Request request) {

  12. this.request = request;

  13. }


  14. @Override

  15. public MaybeSource apply(Maybe upstream) {


  16. return request.getDownloadDelay() > 0 ? upstream.delay(request.getDownloadDelay(), TimeUnit.MILLISECONDS) : upstream;

  17. }

  18. }

下载器只要借助 compose 、DownloaderDelayTransformer,就可以实现 Download 的限速。

以 UrlConnectionDownloader 为例:


  1. Maybe.create(new MaybeOnSubscribe<InputStream>() {


  2. @Override

  3. public void subscribe(MaybeEmitter<InputStream> emitter) throws Exception {


  4. emitter.onSuccess(httpUrlConnection.getInputStream());

  5. }

  6. })

  7. .compose(new DownloaderDelayTransformer(request))

  8. .map(new Function<InputStream, Response>() {


  9. @Override

  10. public Response apply(InputStream inputStream) throws Exception {


  11. ......

  12. return response;

  13. }

  14. });

2.2.3 Domain

Domain 的限速参考了 Scrapy 框架的实现,将每个域名以及它对应的最近访问时间存到 ConcurrentHashMap 中。每次请求时,可以设置 Request 的 domainDelay 属性,从而实现单个 Request 对某个 Domain 的限速。


  1. import cn.netdiscovery.core.domain.Request;


  2. import java.util.Map;

  3. import java.util.concurrent.ConcurrentHashMap;


  4. /**

  5. * Created by tony on 2019-05-06.

  6. */

  7. public class Throttle {


  8. private Map<String,Long> domains = new ConcurrentHashMap<String,Long>();


  9. private static class Holder {

  10. private static final Throttle instance = new Throttle();

  11. }


  12. private Throttle() {

  13. }


  14. public static final Throttle getInsatance() {

  15. return Throttle.Holder.instance;

  16. }


  17. public void wait(Request request) {


  18. String domain = request.getUrlParser().getHost();

  19. Long lastAccessed = domains.get(domain);


  20. if (lastAccessed!=null && lastAccessed>0) {

  21. long sleepSecs = request.getDomainDelay() - (System.currentTimeMillis() - lastAccessed);

  22. if (sleepSecs > 0) {

  23. try {

  24. Thread.sleep(sleepSecs);

  25. } catch (InterruptedException e) {

  26. e.printStackTrace();

  27. }

  28. }

  29. }


  30. domains.put(domain,System.currentTimeMillis());

  31. }

  32. }

待 Request 从消息队列中取出时,会先判断 Request 是否需要暂停之后,然后再判断一下 Domain 的访问是否需要暂停。


  1. while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {


  2. //暂停抓取

  3. ......


  4. // 从消息队列中取出request

  5. final Request request = queue.poll(name);


  6. if (request == null) {


  7. waitNewRequest();

  8. } else {


  9. if (request.getSleepTime() > 0) {


  10. try {

  11. Thread.sleep(request.getSleepTime());

  12. } catch (InterruptedException e) {

  13. e.printStackTrace();

  14. }

  15. }


  16. Throttle.getInsatance().wait(request);


  17. ......

  18. }

  19. }

2.2.4 Pipeline

爬虫处理 Request 的流程大体是这样的:调用网络请求 (包括重试机制) -> 将 response 存放到 page -> 解析 page -> 顺序执行 pipelines -> 完成一次 Request 请求。


  1. // request正在处理

  2. downloader.download(request)

  3. .retryWhen(new RetryWithDelay(maxRetries, retryDelayMillis, request)) // 对网络请求的重试机制

  4. .map(new Function<Response, Page>() {


  5. @Override

  6. public Page apply(Response response) throws Exception {

  7. // 将 response 存放到 page

  8. ......

  9. return page;

  10. }

  11. })

  12. .map(new Function<Page, Page>() {


  13. @Override

  14. public Page apply(Page page) throws Exception {


  15. if (parser != null) {


  16. parser.process(page);

  17. }


  18. return page;

  19. }

  20. })

  21. .map(new Function<Page, Page>() {


  22. @Override

  23. public Page apply(Page page) throws Exception {


  24. if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {


  25. pipelines.stream()

  26. .forEach(pipeline -> {

  27. pipeline.process(page.getResultItems());

  28. });

  29. }


  30. return page;

  31. }

  32. })

  33. .observeOn(Schedulers.io())

  34. .subscribe(new Consumer<Page>() {


  35. @Override

  36. public void accept(Page page) throws Exception {


  37. log.info(page.getUrl());


  38. if (request.getAfterRequest() != null) {


  39. request.getAfterRequest().process(page);

  40. }


  41. signalNewRequest();

  42. }

  43. }, new Consumer<Throwable>() {

  44. @Override

  45. public void accept(Throwable throwable) throws Exception {


  46. log.error(throwable.getMessage(), throwable);

  47. }

  48. });

Pipeline 的限速实质借助了 RxJava 的 delay 和 block 操作符实现。


  1. map(new Function<Page, Page>() {


  2. @Override

  3. public Page apply(Page page) throws Exception {


  4. if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {


  5. pipelines.stream()

  6. .forEach(pipeline -> {


  7. if (pipeline.getPipelineDelay()>0) {


  8. // Pipeline Delay

  9. Observable.just("pipeline delay").delay(pipeline.getPipelineDelay(),TimeUnit.MILLISECONDS).blockingFirst();

  10. }


  11. pipeline.process(page.getResultItems());

  12. });

  13. }


  14. return page;

  15. }

  16. })

另外,NetDiscovery 支持通过配置 application.yaml 或 application.properties 文件,来配置爬虫。当然也支持配置限速的参数,同时支持使用随机的数值来配置相应的限速参数。

2.3 非阻塞的爬虫运行

早期的版本,爬虫运行之后无法再添加新的 Request。因为爬虫消费完队列中的 Request 之后,默认退出程序了。

新版本借助于 Condition,即使某个爬虫正在运行仍然可以添加 Request 到它到消息队列中。

Condition 的作用是对锁进行更精确的控制。它用来替代传统的 Object 的wait()、notify() 实现线程间的协作,相比使用 Object 的 wait()、notify(),使用Condition 的 await()、signal() 这种方式实现线程间协作更加安全和高效。

在 Spider 中需要定义好 ReentrantLock 以及 Condition。

然后再定义 waitNewRequest() 、signalNewRequest() 方法,它们的作用分别是挂起当前的爬虫线程等待新的 Request 、唤醒爬虫线程消费消息队列中的 Request。


  1. private ReentrantLock newRequestLock = new ReentrantLock();

  2. private Condition newRequestCondition = newRequestLock.newCondition();


  3. ......


  4. private void waitNewRequest() {

  5. newRequestLock.lock();


  6. try {

  7. newRequestCondition.await(sleepTime, TimeUnit.MILLISECONDS);

  8. } catch (InterruptedException e) {

  9. log.error("waitNewRequest - interrupted, error {}", e);

  10. } finally {

  11. newRequestLock.unlock();

  12. }

  13. }


  14. public void signalNewRequest() {

  15. newRequestLock.lock();


  16. try {

  17. newRequestCondition.signalAll();

  18. } finally {

  19. newRequestLock.unlock();

  20. }

  21. }

可以看到,如果从消息队列中取不出 Request,则会运行 waitNewRequest()。


  1. while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {


  2. //暂停抓取

  3. if (pause && pauseCountDown!=null) {

  4. try {

  5. this.pauseCountDown.await();

  6. } catch (InterruptedException e) {

  7. log.error("can't pause : ", e);

  8. }


  9. initialDelay();

  10. }


  11. // 从消息队列中取出request

  12. final Request request = queue.poll(name);


  13. if (request == null) {


  14. waitNewRequest();

  15. } else {

  16. ......

  17. }

  18. }

然后,在 Queue 接口中包含了一个 default 方法 pushToRunninSpider() ,它内部除了将 request push 到 queue 中,还有调用了 spider.signalNewRequest()。


  1. /**

  2. * 把Request请求添加到正在运行爬虫的Queue中,无需阻塞爬虫的运行

  3. *

  4. * @param request request

  5. */

  6. default void pushToRunninSpider(Request request, Spider spider) {


  7. push(request);

  8. spider.signalNewRequest();

  9. }

最后,即使爬虫已经运行,也可以在任意时刻将 Request 添加到该爬虫对应到Queue 中。


  1. Spider spider = Spider.create(new DisruptorQueue())

  2. .name("tony")

  3. .url("http://www.163.com");


  4. CompletableFuture.runAsync(()->{

  5. spider.run();

  6. });


  7. try {

  8. Thread.sleep(2000L);

  9. } catch (InterruptedException e) {

  10. e.printStackTrace();

  11. }


  12. spider.getQueue().pushToRunninSpider(new Request("https://www.baidu.com", "tony"),spider);


  13. try {

  14. Thread.sleep(2000L);

  15. } catch (InterruptedException e) {

  16. e.printStackTrace();

  17. }


  18. spider.getQueue().pushToRunninSpider(new Request("https://www.jianshu.com", "tony"),spider);


  19. System.out.println("end....");

总结

爬虫框架 github 地址:https://github.com/fengzhizi715/NetDiscovery

本文总结了通用爬虫框架在某些特定场景中如何使用多线程。未来,NetDiscovery 还会增加更为通用的功能。


该系列的相关文章:

Disruptor 实践:整合到现有的爬虫框架

从API到DSL —— 使用 Kotlin 特性为爬虫框架进一步封装

使用Kotlin Coroutines简单改造原有的爬虫框架

为爬虫框架构建Selenium模块、DSL模块(Kotlin实现)

基于Vert.x和RxJava 2构建通用的爬虫框架


最后推荐一款公益小程序:

这是一款由一位宝爸为自己孩子开发的小程序,主要包括舒尔特方格(以最短时间按顺序指出所以数字,注意力训练)、加减法和乘法口诀。可以根据实际情况选择出题难度和出题数量,有错题重做功能。针对国内幼升小的实际情况,还有针对性的 20 以内加减法运算。使用简单方便,可以在家里、公车上、火车上、候车室等场所使用,可用于奖惩。公益性质,完全免费,无广告!


Java 通用爬虫框架中多线程的使用


关注【Java与Android技术栈】

更多精彩内容请关注扫码

Java 通用爬虫框架中多线程的使用