Celery使用过程中遇到的一些问题

做项目时,其中用了Celery这种工具。前段时间,遇到过一些问题,解决后没有进行总结,今天就抽个空把它记录下来。

用过Celery的程序员,都知道它是一种异步执行程序的工具。里面有Worker、Task等概念,这里我就不赘述了。

1、功能需求

在使用Celery的过程中,我需要知道Task的状态。Task就是异步任务,用户没执行一次异步任务,就会新创建一个Task,代表此异步任务。

该Task对象中,包含着许多信息,其中也包括状态。我的项目中,需要根据Task的状态来判断,这次异步任务是否还在执行中。

2、 出现问题

既然,我需要Task的状态,那我就需要查看Task的状态怎么获取啊!我查看了一下Celery的源码!发现在Celery的AsyncResult对象中,有个state字段。如下图所示:

Celery使用过程中遇到的一些问题

根据,该源码中的注释说明,该字段有好几个值。分别是:PENDING、STARTED、RETRY、FAILURE、SUCCESS。

然后,我就赶紧写个demo验证一下,看看这个state字段是不是我想要的。

demo如下:

Celery使用过程中遇到的一些问题

我执行项目中的Celery异步任务,根据我之前查出来的task id。执行demo查询该Task的状态。

这时候,问题就出来了,根据demo返回的Task状态为PENDING。表示Task还在等待中,尚未执行。

这就不对了,此时的状态应该是STARTED,因为我的Task已经执行好一段时间了,它返回的结果不准确。

3、 解决问题

难道我用的字段不正确,然后我就谷歌搜索。发现Celery官网和网上的大多数反馈也是表示Task的状态字段就是state。

那我为什么测出的结果和理论的不同呢?然后,我详细查看Celery的配置,发现了一个参数:CELERY_TRACK_STARTED。

该参数默认是关闭的,表示只要Celery开始执行Task就会追踪该Task。所以,开启该参数后,Task的状态是时刻记在BACKEND中的。

好,我在Celery的配置文件处,加了该参数。

Celery使用过程中遇到的一些问题

然后再执行Celery的异步任务,得到的结果是我想要的。

4、 引申思考以及问题

我的问题是解决了,但是这引起了我对Celery的一些兴趣。

当时,我就考虑到,如果我把正在运行中的Task任务,直接kill掉。那么此时我再去看Task的状态,它会是啥呢?

STARTED,正在执行中的状态。

而此时,Task已经关掉,它不应该是这种状态。为此,我猜测这应该是,Task意外结束,没有改变Task的状态导致的。

但是这样就不太好了,因为只要是程序,那它就一定有意外退出的可能。假设,我的项目需要查看Task的状态,当Task被意外kill掉时,项目中查看Task的状态就不准确了。

5、 引申问题解决思路一

当时我想:既然Task被kill掉之后,还能显示运行中,说明此Task的状态一定是保存在某个地方,我把该Task的数据清空了不就完了。

而Celery的数据存储,只有可能存在三个地方:使用RabbitMQ的消息代理(BROKER),使用Redis的任务结果保存处(BACKEND),以及文件保存(当然这点基本上没可能,Celery没这样用过,我主要是死马当活马医)。

这三个地方,其实只有Redis可能存放Task的状态,按照Celery的机制,也只有它最有可能存放。

但是呢?为了弄清Celery的存储机制,我想试试Celery会把数据存到RabbitMQ中吗?

然后,我执行了,以下命令,清空RabbitMQ队列。

Celery使用过程中遇到的一些问题

此时,RabbitMQ队列的数据已经为空。然后我查看Task的状态,依然还为STARTED。说明不是它存储Task的状态。

然后,我进入redis中。使用keys *命令,发现许多带有celery-task-meta前缀的记录。

Celery使用过程中遇到的一些问题

后经查明,这些记录的后缀就是Celery中Task的id。

我根据我的Task的id,查出如下内容:

Celery使用过程中遇到的一些问题

我们能清晰的看出,内容中Task的状态为STARTED。这说明的确是存放在Redis中的。

然后,我把这条记录删除,再执行demo,Task的状态不再是STARTED,项目中显示的状态就正确了。

但是,这又引出了一个问题,怎么删除这条记录,或者什么时候删除这条记录。当然,我们删除很容易,编程语言的redis模块或者Celery自己提供的代码都能删除。Celery中根据task id删除backend中的数据。

Celery使用过程中遇到的一些问题

那么,什么时候删除这条记录呢?Celery默认的是保留此数据24小时。我左思右想,还是不删这条记录了。换种思路解决这个问题吧!

6、 引申问题解决思路二

要知道,Celery的Task是运行在Worker上的。只要判断此时的Worker程序是否还正常运行,不就可以判断Task的状态是否还在运行中了吗?

说干就干,我们通过ps命令,可以查看Celery运行的程序。

Celery使用过程中遇到的一些问题

然后,我们执行task时,把它本身运行的程序进程pid记录下来,发现正好就是Worker的进程pid。

这样就简单了,我们只需要结合Celery提供查看Task状态的接口,以及Python提供的Psutil查看进程的模块。就能最终判断Task是不是真在运行中。只有Task的状态为STARTED,并且Task所在的Worker进程在运行中,Task才是真正在运行状态。

Psutil查看进程是否运行代码如下:

Celery使用过程中遇到的一些问题

7、 总结

今天只是把我前段时间遇到的问题以及解决思路记录下来,也没写Celery的内部机制等等,这些东西网上一大把,我也不是很有写它们的必要。

做过几年的程序员,感触最多的就是解决问题的思路。一旦遇到某个问题了,一种思路解决不了,可以换种思路解决,另一种思路可能也不一定能完美解决,但可以加深对问题的理解。而怎么想到另一种思路,就需要平时的多积累和提高自己的认知范围了,这还是比较难的。

了解新钛云服

新钛云服正式获批工信部ISP/IDC(含互联网资源协作)牌照

TiOps,支持多云环境安全远程运维,疫情期间免费对外开放,助力远程安全办公!

深耕专业,矗立鳌头,新钛云服获千万Pre-A轮融资

新钛云服,打造最专业的Cloud MSP+,做企业业务和云之间的桥梁

新钛云服一周年,完成两轮融资,服务五十多家客户

上海某仓储物流电子商务公司混合云解决方案

新钛云服出品的部分精品技术干货

国内主流公有云VPC使用对比及总结

万字长文:云架构设计原则|附PDF下载

刚刚,OpenStack 第 19 个版本来了,附28项特性详细解读!

Ceph OSD故障排除|万字经验总结

七个用于Docker和Kubernetes防护的安全工具

运维人的终身成长,从清单管理开始|万字长文!

OpenStack与ZStack深度对比:架构、部署、计算存储与网络、运维监控等

什么是云原生?

IT混合云战略:是什么、为什么,如何构建?

Celery使用过程中遇到的一些问题