airflow调度问题排查

问题描述

现在公司使用的airflow调度器很慢,每次clear一个task之后,这个task要过一段时间才会被调度器调度到,这个时间大约需要15-30s。

使用的airflow版本较老:v1.7.1.3

排查

  1. 参数问题
    airflow.cfg这个文件里面有很多的关于scheduler的参数,其中跟调度密切相关的有几个:

    • max_threads :这个是调度dag的进程数量,最多有CPU个数的进程。
    • scheduler_heartbeat_sec :这个是调度的时间间隔,如果上一次调度时间小于这个值,则在这个周期里面不会进行调度。
    • min_file_process_interval:这个是自动去更新dag操作的时间间隔,值为-1代表永不自动更新。
    • run_duration:scheduler进程重启时间,值为-1代表永不重启
    • refresh_dags_every:这个是我自己新加的参数,表示经过多少次调度之后重新载入所有的dag

    上面这些参数在jobs.py这个文件里面都可以看到相关的使用,其中最后一个refresh_dags_every这个参数在老的版本里面是写死在SchedulerJob类构造函数里面的[__init__函数默认传参],这里新加的意思只是把配置写在airflow.cfg配置文件里面,方便之后更改。
    在老的版本里面,SchedulerJob类里面的_excute函数即为调度的主要函数,这个函数做了下面几件事:

    • 控制相同pool的任务并发数量,
    • 重载所有的dags[这个跟refresh_dags_every相关]
    • 开进程去均分处理所有的dags,把可以执行的task加入到一个全局的队列里面[跟max_threads相关]

    所以,直接去修改max_thread这个参数是可以得到最直接的感受的,把这个参数调整成CPU的个数,可以获得最大的调度进程的数量,每个进程均分的dags越少,整体时间越快。为什么是整体时间?
    airflow调度问题排查
    这一段代码是把所有可执行的task加入到队列里面,外循环的条件是只要有一个进程还存活的话,就进入循环,相当于主进程已经在这里开始join子进程了,在等待所有的进程结束,所以这一段的时间取决于子进程里面运行时间最长的进程运行的时间。

    当我把这个值从默认的2改到8之后,调度的时间已经从15-20s降低到了4-8s,但是没有直接获得我想要的4倍的提升,肯定还有其他的问题。

  2. 数据库问题
    airflow调度问题排查
    这个函数是用来处理pool的并发的,里面有一个查询task state的语句,之前都很快,但是使用过一段时间之后会很慢,查看数据库,数据库包含了很多的历史数据,并且state这一列也没有索引,所以导致一个很简单的查询语句使用了1s的时间。这里的解决办法是把这一列加上索引,或者说可以让数据库里面只保存近一年的数据即可。我这里使用了加索引的办法,现在的查询已经下降到了0.003 s左右,提升了很多。

总结

这个是第一次对开源工具性能调优的尝试,的确把调度时间优化了很多,现在的每次的调度时间只需要2-3s就可以完成了。

  1. 日志很重要,从日志里面可以分析出很多东西。
  2. 数据库的慢查询需要一定的时间去发现。
  3. 官方文档也很重要,从上面可以获得一些启示。
  4. *上面也有很多的类似的问题,可以先在上面搜索。

分析过程

airflow调度问题排查