如何添加超时扭曲延迟

如何添加超时扭曲延迟

问题描述:

 
from twisted.internet import reactor 
from twisted.internet import threads 
from twisted.internet import defer 
import time 

def worker(arg): 
    print 'Hello world' 
    time.sleep(10) 
    return 1 

def run(): 
    print 'Starting workers' 
    l = [] 
    for x in range(2): 
     l.append(threads.deferToThread(worker, x)) 
    return defer.DeferredList(l) 

def res(results): 
    print results 
    reactor.stop() 

d = run() 
d.addCallback(res) 
reactor.run() 

如何通过超时阻止工作?如何添加超时扭曲延迟

线程不能被打断,除非他们与你合作。 time.sleep(10)不会合作,所以我不认为你可以打断这个工人。如果你有另一种具有几个独立的阶段,或者在一个循环中对一些任务操作工人的,那么你可以做这样的事情:

def worker(stop, jobs): 
    for j in jobs: 
     if stop: 
      break 
     j.do() 

stop = [] 
d = deferToThread(worker) 

# This will make the list eval to true and break out of the loop. 
stop.append(None) 

这不是扭曲具体的,无论是。这就是线程在Python中的工作方式。

尽管可能无法中断线程,但可以通过cancel函数来停止延迟,我认为该函数可用于Twisted 10.1.0和更高版本。

我已经使用了下面的类来使Deferreds回调一个特定的函数,如果Deferred在一段时间后还没有被触发。对于与OP中主题相同的问题可能会有用。

编辑:正如下面的评论建议,最好不要从defer.Deferred继承。因此,我更改了代码以使用实现相同效果的包装器。超时前

class DeferredWrapperWithTimeout(object): 
    ''' 
    Holds a deferred that allows a specified function to be called-back 
    if the deferred does not fire before some specified timeout. 
    ''' 
    def __init__(self, canceller=None): 
     self._def = defer.Deferred(canceller) 

    def _finish(self, r, t): 
     ''' 
     Function to be called (internally) after the Deferred 
     has fired, in order to cancel the timeout. 
     ''' 
     if ((t!=None) and (t.active())): 
      t.cancel() 
     return r 

    def getDeferred(self): 
     return self._def 

    def addTimeoutCallback(self, reactr, timeout, 
          callUponTimeout, *args, **kw): 
     ''' 
     The function 'callUponTimeout' (with optional args or keywords) 
     will be called after 'timeout' seconds, unless the Deferred fires. 
     ''' 

     def timeoutCallback(): 
      self._def.cancel() 
      callUponTimeout(*args, **kw) 
     toc = reactr.callLater(timeout, timeoutCallback) 
     return self._def.addCallback(self._finish, toc) 

回调:

from twisted.internet import reactor 

from DeferredWithTimeout import * 

dw = DeferredWrapperWithTimeout() 
d = dw.getDeferred() 

def testCallback(x=None): 
    print "called" 

def testTimeout(x=None): 
    print "timedout" 

d.addCallback(testCallback) 
dw.addTimeoutCallback(reactor, 20, testTimeout, "to") 
reactor.callLater(2, d.callback, "cb") 
reactor.run() 

打印 “叫”,别无其他。回调之前

超时:

from twisted.internet import reactor 

from DeferredWithTimeout import * 

dw = DeferredWrapperWithTimeout() 
d = dw.getDeferred() 

def testCallback(x=None): 
    print "called" 

def testTimeout(x=None): 
    print "timedout" 

d.addCallback(testCallback) 
dw.addTimeoutCallback(reactor, 20, testTimeout, "to") 
reactor.run() 

打印 “已逾时” 20秒后,并没有别的。

+2

你真的不应该继承'延迟'。实现这个功能作为一个单独的帮手,而不是一个子类。 http://pyvideo.org/video/1684/the-end-of-object-inheritance-the-beginning-of – 2013-09-26 11:33:21

+0

除了关于不继承事物的常见警告之外,“延迟”是一个*特别*坏事子类,因为它的行为假定它自己的实现非常具体,并且不会很好地反应某些方法被覆盖。 – Glyph 2013-09-26 19:15:02

+0

感谢您观看该视频的链接!它完全改变了我设计代码的方式。 – Corey 2015-04-18 00:43:32

嗯,我的回答是不是线程,但有人说,你可以实现暂停功能作为一个单独的帮手:

from twisted.internet import defer 

def add_watchdog(deferred, timeout=0.05): 

    def callback(value): 
     if not watchdog.called: 
      watchdog.cancel() 
     return value 

    deferred.addBoth(callback) 

    from twisted.internet import reactor 
    watchdog = reactor.callLater(timeout, defer.timeout, deferred) 

d = defer.Deferred() 
add_watchdog(d) 

然后你就可以捕获defer.TimeoutError递延的errback可如果你需要。

+0

嗯,好像取消缺失 – 2015-09-15 17:22:12

+0

@ CarlD'Halluin,谨慎地阐述,或建议编辑? – TCAllen07 2016-01-04 00:52:38

我们这样做使用装饰。这种方法的优点是延迟在达到超时时被取消。这应该成为扭曲库的一部分imho

from twisted.internet import defer, reactor 

def timeout(secs): 
    """Decorator to add timeout to Deferred calls""" 
    def wrap(func): 
     @defer.inlineCallbacks 
     def _timeout(*args, **kwargs): 
      raw_d = func(*args, **kwargs) 
      if not isinstance(raw_d, defer.Deferred): 
       defer.returnValue(raw_d) 

      timeout_d = defer.Deferred() 
      times_up = reactor.callLater(secs, timeout_d.callback, None) 

      try: 
       raw_result, timeout_result = yield defer.DeferredList(
        [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True, 
        consumeErrors=True) 
      except defer.FirstError as e: # Only raw_d should raise an exception 
       assert e.index == 0 
       times_up.cancel() 
       e.subFailure.raiseException() 
      else: # timeout 
       if timeout_d.called: 
        raw_d.cancel() 
        raise Exception("%s secs have expired" % secs) 

      # no timeout 
      times_up.cancel() 
      defer.returnValue(raw_result) 
     return _timeout 
return wrap