中断主调用线程如果子线程抛出异常

问题描述:

我正在使用threading.Thread和t.start()与可调参数列表来执行长时间运行的多线程处理。我的主线程被阻塞,直到所有线程完成。但是,如果其中一个可调参数抛出异常并终止其他线程,我希望t.start()立即返回。中断主调用线程如果子线程抛出异常

使用t.join()检查执行的线程是否提供了有关因异常导致的失败的信息。

这里是代码:

import json 
import requests 


class ThreadServices: 
    def __init__(self): 
     self.obj = "" 

    def execute_services(self, arg1, arg2): 
     try: 
      result = call_some_process(arg1, arg2) #some method 
      #save results somewhere 
     except Exception, e: 
      # raise exception 
      print e 

    def invoke_services(self, stubs): 
     """ 
     Thread Spanning Function   
     """ 
     try: 
      p1 = "" #some value 
      p2 = "" #some value 
      # Call service 1 
      t1 = threading.Thread(target=self.execute_services, args=(a, b,) 

      # Start thread 
      t1.start() 
      # Block till thread completes execution 
      t1.join() 

      thread_pool = list() 
      for stub in stubs: 
       # Start parallel execution of threads 
       t = threading.Thread(target=self.execute_services, 
              args=(p1, p2)) 
       t.start() 
       thread_pool.append(t) 
      for thread in thread_pool: 
       # Block till all the threads complete execution: Wait for all 
       the parallel tasks to complete 
       thread.join() 

      # Start another process thread 
      t2 = threading.Thread(target=self.execute_services, 
              args=(p1, p2) 
      t2.start() 
      # Block till this thread completes execution 
      t2.join() 

      requests.post(url, data= json.dumps({status_code=200})) 
     except Exception, e: 
      print e 
      requests.post(url, data= json.dumps({status_code=500})) 
     # Don't return anything as this function is invoked as a thread from 
     # main calling function 


class Service(ThreadServices): 
    """ 
    Service Class 
    """ 

    def main_thread(self, request, context): 
     """ 
     Main Thread:Invokes Task Execution Sequence in ThreadedService   
     :param request: 
     :param context: 
     :return: 
     """ 
     try: 
      main_thread = threading.Thread(target=self.invoke_services, 
             args=(request,)) 
      main_thread.start() 
      return True 
     except Exception, e: 
      return False 

当我打电话服务()main_thread(请求,上下文)并有一些例外执行T1,我需要得到它在main_thread提出并返回false。我怎样才能实现这个结构。谢谢!!

首先,你太复杂了。我会这样做:

from thread import start_new_thread as thread 
from time import sleep 

class Task: 
    """One thread per task. 
    This you should do with subclassing threading.Thread(). 
    This is just conceptual example. 
    """ 
    def __init__ (self, func, args=(), kwargs={}): 
     self.func = func 
     self.args = args 
     self.kwargs = kwargs 
     self.error = None 
     self.done = 0 
     self.result = None 

    def _run (self): 
     self.done = 0 
     self.error = None 
     self.result = None 
     # So this is what you should do in subclassed Thread(): 
     try: self.result = self.func(*self.args, **self.kwargs) 
     except Exception, e: 
      self.error = e 
     self.done = 1 

    def start (self): 
     thread(self._run,()) 

    def wait (self, retrexc=1): 
     """Used in place of threading.Thread.join(), but it returns the result of the function self.func() and manages errors..""" 
     while not self.done: sleep(0.001) 
     if self.error: 
      if retrexc: return self.error 
      raise self.error 
     return self.result 

# And this is how you should use your pool: 
def do_something (tasknr): 
    print tasknr-20 
    if tasknr%7==0: raise Exception, "Dummy exception!" 
    return tasknr**120/82.0 

pool = [] 
for task in xrange(20, 50): 
    t = Task(do_something, (task,)) 
    pool.append(t) 
# And only then wait for each one: 
results = [] 
for task in pool: 
    results.append(task.wait()) 
print results 

这样你可以使task.wait()引发错误。该线程已经停止。所以你所要做的就是在完成后从池中或整个池中删除它们的引用。你甚至可以:

results = [] 
for task in pool: 
    try: results.append(task.wait(0)) 
    except Exception, e: 
     print task.args, "Error:", str(e) 
print results 

现在,没有严格的(我的意思是任务()类)的使用,因为它需要很多的东西加入到用于实际。

只需子类threading.Thread()并通过重写run()和join()或添加新的函数(如wait())来实现类似的概念。

+0

谢谢@Dalen。这种方法至少对我有用。一些解决方法,它启动。 – Zeus