通过ipython并行启动新任务时的通知

通过ipython并行启动新任务时的通知

问题描述:

与ipyparallel最佳实践相关的一些问题。我试图用它来实现一个需要运行~15的模型的蒙特卡罗框架。这个想法是运行N个引擎(通过SLURM)并拥有一个“主”进程,它将所有需要的任务异步排队并忙碌 - 等待完成,更新一个包含每次运行状态的sqlite数据库。通过ipython并行启动新任务时的通知

我想知道一个任务何时被分配给一个引擎,以便我可以跟踪其数据库中的状态。我尝试使用AsyncResult实例来获取msg_id并查询任务数据库,但直到任务完成后才会更新“已启动”字段。

似乎应该有一种方法来接收此通知,或者至少在引擎工作时查询集线器。

此外,我必须做些什么来避免长时间运行任务期间发动机心跳超时?这是client.spin_thread()的目的吗?

谢谢!

我已经使用publish_data回答了我自己问题的一部分。我的想法是,不是在每个引擎中调用我的主“worker”函数,而是在主worker函数前后调用publish_data()来设置客户端可以看到的状态。例如:

def wrapper(run_id, argDict): 
    from ipyparallel.engine.datapub import publish_data 

    publish_data({run_id : 'running'}) 
    status = runMonteCarloTrial(argDict) # runs for ~15 minutes 
    publish_data({run_id : status}) 
    return status 

“主”任务调用:

ar = client.map_async(wrapper, listOfArgDicts) 

我然后遍历ar直到所有AsyncResults是否齐全,检查ar.data读取公布的数据,以确定运行试验和保存试验结果到一个sqlite3数据库。

这种通用方法适用于简单的测试用例。我还没有探讨长时间运行函数调用的超时问题。