扭曲/执行异步http请求

扭曲/执行异步http请求

问题描述:

我有一个扭曲的反应堆正在侦听传入的数据。我有第二个反应器在一定的时间间隔内执行http请求,将结果发送到第一个反应器。两者都运行良好。扭曲/执行异步http请求

现在我想把它放在一起运行在一个反应​​器中,但我不知道如何实现这一点。就像 - 每60秒执行一次http请求。以与第一个收听“主”反应堆内部不同的方式进行。

我目前所面对的是:

# main reactor listening for incoming data forever 
... 
reactor.listenTCP(8123, TCPEventReceiverFactory()) 

的HTTP反应器使用twisted.internet.defer.DeferredSemaphore()执行若干HTTP检查:

# create semaphore to manage the deferreds 
semaphore = twisted.internet.defer.DeferredSemaphore(2) 

# create a list with all urls to check 
dl = list() 
# append deferreds to list 
for url in self._urls: 
    # returns deferred 
    dl.append(semaphore.run(self._getPage, url)) 

# get a DefferedList 
dl = twisted.internet.defer.DeferredList(dl) 
# add some callbacks for error handling 
dl.addCallbacks(lambda x: reactor.stop(), self._handleError) 

# start the reactor  
reactor.run() 

我怎么能定时HTTP检查添加到“主”反应堆,以便它们以异步方式执行? DeferredSemaphore如何正确工作?

任何人都可以帮助我吗?

[这是一种处理http checkresults的轻量级监控系统。我是扭曲和异步编程的新手。我在运行Python 2.7的Xubuntu 12.04上]

+0

什么是延期信号灯甚至? – SingleNegationElimination 2013-05-14 11:09:06

+0

用于限制同时执行的请求。 – user937284 2013-05-14 11:50:53

你不需要多个反应器。只需使用同一个反应器执行所有不同的操作。如果你打电话给reactor.stop(),你可能会做一些错误的事情,所以让我们摆脱它,并将它们全部整合成一个函数(我们可以用它作为回调函数)。因为它正在进行异步工作,它也应该返回一个延迟,我们将使用您已经使用的DeferredList

def thing_that_does_http(): 
    # create semaphore to manage the deferreds 
    semaphore = twisted.internet.defer.DeferredSemaphore(2) 

    # create a list with all urls to check 
    dl = DeferredList() 
    # append deferreds to list 
    for url in self._urls: 
     # returns deferred 
     dl.append(semaphore.run(self._getPage, url)) 

    # get a DefferedList 
    dl = twisted.internet.defer.DeferredList(dl) 
    # add some callbacks for error handling 
    dl.addErrback(self._handleError) 
    return dl 

为“在一定的时间间隔进行X”自然的方法是用循环调用。有了这个回调函数,我们不需要做太多

reactor.listenTCP(8123, TCPEventReceiverFactory()) 
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http) 
# run once per minute, starting now. 
loop_http.start(60) 

反应器LoopingCallgetPage将用来为自己的目的是twisted.internet.reactor,如果您使用的是不同的反应器,例如,如果你是在进行单元测试时,您需要覆盖该默认值。

LoopingCall的情况下,这是相当简单的,施工后,(但调用其start()方法之前),设置其clock属性:

from twisted.internet.task import Clock 
fake_reactor = Clock() 
loop_http.clock = fake_reactor 
fake_reactor.advance(120) # move time forward two minutes... 

不幸的是,getPage()情况不太好的。你不能使用任何其他反应器与该接口;你需要使用更新,更光滑的t.w.c.Agent。在许多方面,Agent是优越的,但当你只是想将原始响应主体作为一个字符串时,它并不那么方便。

除了需要传递给它的构造显式反应器,它更多的是在比由GETPAGE提供的便利的请求/响应循环细粒度控制。因此,它主要以Producer s和Protocol s的形式实施。在前者的情况下,我们可以通过便利帮手FileBodyProducer以最小的麻烦发送请求主体;在后者中,我们需要一个简单的协议来缓冲所有的数据块,直到我们获得全部数据。

下面是一段代码,它可以代替getPage,用大致相同的接口,但服用的Agent实例作为第一个参数

from cStringIO import StringIO 
from twisted.internet.defer import Deferred 
from twisted.internet.protocol import Protocol 
from twisted.web.client import ResponseDone 
from twisted.web.client import FileBodyProducer 


class GetPageProtocol(Protocol): 
    def __init__(self): 
     self.deferred = Deferred() 
     self.data = [] 

    def dataReceived(self, data): 
     self.data.append(data) 

    def connectionLost(self, reason): 
     reason.trap(ResponseDone) 
     data = ''.join(self.data) 
     del self.data 
     self.deferred.callback(data) 


def agentGetPage(agent, url, 
       method="GET", 
       headers=None, 
       postdata=None): 
    if postdata is not None: 
     bodyProducer = FileBodyProducer(StringIO(postdata)) 
    else: 
     bodyProducer = None 

    def _getPageResponded(response): 
     if response.length != 0: 
      proto = GetPageProtocol() 
      response.deliverBody(proto) 
      return proto.deferred 
     else: 
      return None 

    d = agent.request(method, url, headers, bodyProducer) 
    d.addCallback(_getPageResponded) 
    return d 

其中,在单元测试,看起来有点像:

from twisted.test.proto_helpers import MemoryReactor 
from twisted.web.client import Agent 
fake_reactor = MemoryReactor() 
agent = Agent(fake_reactor) 
d = agentGetPage(agent, "http://example.com") 

assert fake_reactor.tcpClients # or some such, exercise the code by manipulating the reactor 

编辑:我最初想掠过这给ectomorph,少混淆了;但同时鼓励早期妥善处理反应堆也是一个不错的主意,并在以后避免不必要的痛苦。

+1

谢谢!太棒了,那正是我想要的。 'twisted.internet.reactor'是我正在使用的反应器。 – user937284 2013-05-14 14:45:25

+2

很好的答案,但不完全正确:)。 'LoopingCall'实际上会使用'self.clock',默认情况下它会初始化为'twisted.internet.reactor' * *。改变它的能力很重要,特别是对于测试。 (可悲的是,'getPage'实际上是硬编码到它,这就是为什么我们建议'twisted.web.client.Agent'现在不是。) – Glyph 2013-05-20 17:51:26

+2

@Glyph:更新:有,我觉得包括使用反应堆的少suckishly 。 – SingleNegationElimination 2013-05-20 22:30:16