Python多线程到Celery任务中。 celery_task.update_state()错误
问题描述:
我想在Celery任务中实现线程池。Python多线程到Celery任务中。 celery_task.update_state()错误
我芹菜任务调用update_state()函数来发送有关任务状态DB信息。它运作成功。 但是,当我将线程添加到任务并尝试在每个线程中调用update_state()函数时 - Celery会返回错误。
这是工作例(无螺纹):
import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
这不是工作示例(带螺纹):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
的错误是:
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
是什么原因?为什么我不能将update_state()调用到线程中?
答
我找到了答案!这是从芹菜出资人之一的答案:
task.request是一个线程局部,所以只执行任务的线程可以调用update_state。
这尤其是有道理的,如果你认为线程可以与任务后处理程序存储结果比赛。
您可以将TASK_ID传递给线程:
cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to})
但你必须让该死的肯定线程加入和任务退出之前停止(的Thread.join())。 在你的例子中,线程只能在while循环退出之后才能被连接,并且由于你正在休眠1秒,连接可能会被延迟。
答
芹菜增加了一种上下文对象的线程,所以它知道它的任务是涉及到。为了将线程与任务相关联,您需要执行以下操作:
from celery.app import push_current_task
def run_subtask(celery_task, i):
push_current_task(celery_task)
...
pop_current_task()
非常感谢!我已经找到解决方案并将其发布到此处。我没有测试你的解决方案,但我认为它有相同的方向。 – Denti
矿是文档建议的方式。我会建议你使用我的。 –