python里celery使用
-
安装celery
pip install Celery
-
任务队列是一种跨线程、跨机器工作的一种机制.
-
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
-
celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
-
一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。
-
例子:全部存储到一个文件时 tasks目录下的task_sms
# coding:utf-8 # 这是单独的时候, 我们把celery的程序单独分装成一个应用的时候就上面把这个单独的文件拆分后的样子 from celery import Celery from ihome.libs.yuntongxun.sms import CCP # 创建celery对象 celery_app = Celery("ihome", broker="redis://ip/1") # 定义任务 @celery_app.task def send_sms(to, datas, temp_id): """ 发送短信 :param to: 手机号 :param datas: [验证码, 时间/分钟] :param temp_id: 1 """ ccp = CCP() ccp.send_template_sms(to, datas, temp_id)
-
把celey分包开展, 列子结构 下面导包时ihome时所有程序的父目录,tasks时celery的父目录,CCP是发送短信的类.
tasks ├── __init__.py ├── config.py ├── main.py └── sms ├── __init__.py └── tasks.py
-
# coding:utf-8 # 任务存放处 BROKER_URL = 'redis://ip:6379/1' # 结果存放处 CELERY_RESULT_BACKEND = 'redis://ip:6379/2'
-
# coding:utf-8 from celery import Celery from ihome.tasks import config # 创建celery对象 app = Celery("ihome") # 引入配置信息 app.config_from_object(config) # 自动搜索任务, 但是文件名必须是tasks # 这是个列表, 如果除了sms模块还有orders的话, 你也要写在后面列 # celery_app.autodiscover_tasks(["ihome.tasks.sms", "ihome.tasks.orders"]) app.autodiscover_tasks(["ihome.tasks.sms"]) # 启动celery只需要启动这个main
-
# coding:utf-8 from ihome.tasks.main import app from ihome.libs.yuntongxun.sms import CCP @app.task def send_template_sms(to, datas, temp_id): """ 发送短信 用户手机号 [验证码, 时间] 1 :return: 0代表成功 -1代表失败 """ ccp = CCP() ret = ccp.sendTemplateSMS(to, datas, temp_id) return ret
-
获取task.py返回的值
# 发送短信 # 用delay发送任务 # 使用celery异步发送短信, delay函数调用后立即返回 result = tasks.send_templates_sms.delay(mobile, [sms_code, int(constants.SMS_CODE_REDIS_EXPIRES / 60)], 1) # 返回的是异步执行结果对象 # 打印异步执行的那个id print(result.id) # 存在redis的键是celery-task-meta-result.id # 根据result.id把它组成上面那样的键也可以从redis中拿到那个值 # 通过get方法页能获取celery异步执行的结果 # get方法默认是阻塞的行为, 会等到有了执行结果之后才返回 # get方法页接受参数的timeout, 超时时间, 超过超时时间还拿不到结果, 则返回 ret = result.get() # 这样的话就又变成同步了, 一直在等待celery的结果 print(ret)