执行其他芹菜任务不工作的芹菜周期性任务
问题描述:
我有一个API,它返回其他API的列表。执行其他芹菜任务不工作的芹菜周期性任务
我需要访问这些API每隔15分钟,并把返回到数据库中的数据。
下面是我用芹菜和Redis的在celery_worker.py文件中写道。但是所有的任务都没有开始。
list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json()
CELERYBEAT_SCHEDULE = {
'every-15-minute': {
'task': 'fetch_data_of_all_APIs',
'schedule': timedelta(minutes=15),
},
}
@celery.task
def access_one_API(one_API):
return requests.get(one_API).json()
@celery.task(name='fetch_data_of_all_APIs')
def fetch_data_of_all_APIs():
for one_API in list_of_APIs:
task = access_one_API.delay(one_API)
# some codes to put all task.id into a list_of_task_id
for task_id in list_of_task_id:
# some codes to get the results of all tasks
# some codes to put all the results into a database
的fetch_data_of_all_APIs
功能应每15分钟,这是应该使用多个工人跑access_one_API
功能
芹菜服务器启动成功的终端但既不fetch_data_of_all_APIs
也不access_one_API
开始运行。
如果我提取fetch_data_of_all_APIs
函数中的代码,access_one_API
可以启动并由多个芹菜工作人员执行。但只要我将这些代码放在一个函数中并用@celery.task
来修饰它,那么这两个函数都不会启动。
所以我相信它一定与芹菜有关。
非常感谢提前。
答
这里例如如何配置周期性任务与子任务芹菜(我设置20秒示范)。 tasks.py:
import celery
from celery.canvas import subtask
from celery.result import AsyncResult
# just for example list of integer values
list_of_APIs = [1, 2, 3, 4]
@celery.task(name='access_one_API')
def access_one_API(api):
"""
Sum of subtask for demonstration
:param int api:
:return: int
"""
return api + api
@celery.task(name='fetch_data_of_all_APIs')
def fetch_data_of_all_APIs(list_of_APIs):
list_task_ids = []
for api in list_of_APIs:
# run of celery subtask and collect id's of subtasks
task_id = subtask('access_one_API', args=(api,)).apply_async().id
list_task_ids.append(task_id)
result_sub_tasks = {}
for task_id in list_task_ids:
while True:
task_result = AsyncResult(task_id)
if task_result.status == 'SUCCESS':
# if subtask is finish add result and check result of next subtask
result_sub_tasks[task_id] = task_result.result
break
print result_sub_tasks
# do something with results of subtasks here...
app = celery.Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
)
app.conf.beat_schedule = {
'add-every-20-seconds': {
'task': 'fetch_data_of_all_APIs',
'schedule': 20.0,
# args for fetch_data_of_all_APIs
'args': (list_of_APIs,)
},
}
运行芹菜:从终端celery worker -A tasks.app --loglevel=info --beat
跟踪:
[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2}
希望这有助于。
请注意,您需要'@ celery.task()'装饰器。另外,您需要检查'celery-beat'配置参数,因为当前的芹菜版本使用小写设置。 –