django celery redis简单测试

希望在下一版中,能用这个小芹菜,来实现异步的多任务并行哈。

安装REDIS之类的不表,只说在DJANGO当中要注意配置的事项。

0,安装插件

yum install redis-server
pip install celery
pip install celery-with-redis
pip install django-celery

 

1,settings.py当中的配置:

django celery redis简单测试
import djcelery
djcelery.setup_loader()

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
django celery redis简单测试

 

2,celery.py当中的配置(实际应用时,注意PROJECT和APP的变通配置)

django celery redis简单测试
from __future__ import absolute_import

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerytest.settings')

app = Celery('celerytest')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
django celery redis简单测试

 

3,__init__.py当中的配置

from __future__ import absolute_import

from .celery import app as celery_app

 

4,tasks.py当中的配置

django celery redis简单测试
from celerytest import celery_app
from celery import shared_task
from time import sleep

@celery_app.task()
def UploadTask(message):
    
    UploadTask.update_state(state='PROGRESS', meta={'progress': 0})
    sleep(30)
    UploadTask.update_state(state='PROGRESS', meta={'progress': 30})
    sleep(30)
    return message

def get_task_status(task_id):
    
    task = UploadTask.AsyncResult(task_id)

    status = task.state
    progress = 0

    if status == u'SUCCESS':
        progress = 100
    elif status == u'FAILURE':
        progress = 0
    elif status == 'PROGRESS':
        progress = task.info['progress']
    
    return {'status': status, 'progress': progress}

@celery_app.task
def add(x, y):
    return x + y

def mul(x, y):
    return x * y
django celery redis简单测试

 

5,测试代码

django celery redis简单测试
>>> from app.tasks import *
>>> t = UploadTask.delay("heel")
>>> get_task_status(t.id)
{'status': u'PROGRESS', 'progress': 0}
>>> get_task_status(t.id)
{'status': u'PROGRESS', 'progress': 0}
>>> get_task_status(t.id)
{'status': u'PROGRESS', 'progress': 30}
>>> get_task_status(t.id)
{'status': u'PROGRESS', 'progress': 30}
>>> get_task_status(t.id)
{'status': u'SUCCESS', 'progress': 100}
>>> add.delay(3245, 35)
<AsyncResult: 6b45b2d6-2343-4d42-9810-e72d6c936a01>
django celery redis简单测试

 

6,启动worker(同样注意运行时的帐号,运行时当前目录,PROJ的名称及参数,在后期或是生产环境时,DAEMON的配置)

 celery worker -A celerytest -l info

 

7,输出截图

django celery redis简单测试

django celery redis简单测试

django celery redis简单测试