Python多线程到Celery任务中。 celery_task.update_state()错误

问题描述:

我想在Celery任务中实现线程池。Python多线程到Celery任务中。 celery_task.update_state()错误

我芹菜任务调用update_state()函数来发送有关任务状态DB信息。它运作成功。 但是,当我将线程添加到任务并尝试在每个线程中调用update_state()函数时 - Celery会返回错误。

这是工作例(无螺纹):

import celery 

@celery.task(bind=True) 
def get_info(self, user): 
    for i in xrange(4): 
     self.update_state(state=states.SUCCESS, meta={'subtask_id': i}) 

这不是工作示例(带螺纹):

import celery 
import threading 

lock = threading.Lock() 

def run_subtask(celery_task, i): 
    lock.acquire() 
    #Error raises here, when update_state calls 
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i}) 
    lock.release() 

@celery.task(bind=True) 
def get_info(self, user): 

    for i in xrange(4): 
     worker = threading.Thread(target=run_subtask, args=(self, i)) 
     worker.start() 

的错误是:

[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py", 
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key), 
    [2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found 

是什么原因?为什么我不能将update_state()调用到线程中?

我找到了答案!这是从芹菜出资人之一的答案:

task.request是一个线程局部,所以只执行任务的线程可以调用update_state。

这尤其是有道理的,如果你认为线程可以与任务后处理程序存储结果比赛。

您可以将TASK_ID传递给线程:

cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to}) 

但你必须让该死的肯定线程加入和任务退出之前停止(的Thread.join())。 在你的例子中,线程只能在while循环退出之后才能被连接,并且由于你正在休眠1秒,连接可能会被延迟。

芹菜增加了一种上下文对象的线程,所以它知道它的任务是涉及到。为了将线程与任务相关联,您需要执行以下操作:

from celery.app import push_current_task 


def run_subtask(celery_task, i): 
    push_current_task(celery_task) 

    ... 

    pop_current_task() 
+0

非常感谢!我已经找到解决方案并将其发布到此处。我没有测试你的解决方案,但我认为它有相同的方向。 – Denti

+0

矿是文档建议的方式。我会建议你使用我的。 –