扭曲/执行异步http请求
我有一个扭曲的反应堆正在侦听传入的数据。我有第二个反应器在一定的时间间隔内执行http请求,将结果发送到第一个反应器。两者都运行良好。扭曲/执行异步http请求
现在我想把它放在一起运行在一个反应器中,但我不知道如何实现这一点。就像 - 每60秒执行一次http请求。以与第一个收听“主”反应堆内部不同的方式进行。
我目前所面对的是:
# main reactor listening for incoming data forever
...
reactor.listenTCP(8123, TCPEventReceiverFactory())
的HTTP反应器使用twisted.internet.defer.DeferredSemaphore()
执行若干HTTP检查:
# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)
# create a list with all urls to check
dl = list()
# append deferreds to list
for url in self._urls:
# returns deferred
dl.append(semaphore.run(self._getPage, url))
# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addCallbacks(lambda x: reactor.stop(), self._handleError)
# start the reactor
reactor.run()
我怎么能定时HTTP检查添加到“主”反应堆,以便它们以异步方式执行? DeferredSemaphore
如何正确工作?
任何人都可以帮助我吗?
[这是一种处理http checkresults的轻量级监控系统。我是扭曲和异步编程的新手。我在运行Python 2.7的Xubuntu 12.04上]
你不需要多个反应器。只需使用同一个反应器执行所有不同的操作。如果你打电话给reactor.stop()
,你可能会做一些错误的事情,所以让我们摆脱它,并将它们全部整合成一个函数(我们可以用它作为回调函数)。因为它正在进行异步工作,它也应该返回一个延迟,我们将使用您已经使用的DeferredList
。
def thing_that_does_http():
# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)
# create a list with all urls to check
dl = DeferredList()
# append deferreds to list
for url in self._urls:
# returns deferred
dl.append(semaphore.run(self._getPage, url))
# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addErrback(self._handleError)
return dl
为“在一定的时间间隔进行X”自然的方法是用循环调用。有了这个回调函数,我们不需要做太多
reactor.listenTCP(8123, TCPEventReceiverFactory())
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http)
# run once per minute, starting now.
loop_http.start(60)
反应器LoopingCall
和getPage
将用来为自己的目的是twisted.internet.reactor
,如果您使用的是不同的反应器,例如,如果你是在进行单元测试时,您需要覆盖该默认值。
在LoopingCall
的情况下,这是相当简单的,施工后,(但调用其start()
方法之前),设置其clock
属性:
from twisted.internet.task import Clock
fake_reactor = Clock()
loop_http.clock = fake_reactor
fake_reactor.advance(120) # move time forward two minutes...
不幸的是,getPage()
情况不太好的。你不能使用任何其他反应器与该接口;你需要使用更新,更光滑的t.w.c.Agent
。在许多方面,Agent
是优越的,但当你只是想将原始响应主体作为一个字符串时,它并不那么方便。
除了需要传递给它的构造显式反应器,它更多的是在比由GETPAGE提供的便利的请求/响应循环细粒度控制。因此,它主要以Producer
s和Protocol
s的形式实施。在前者的情况下,我们可以通过便利帮手FileBodyProducer
以最小的麻烦发送请求主体;在后者中,我们需要一个简单的协议来缓冲所有的数据块,直到我们获得全部数据。
下面是一段代码,它可以代替getPage
,用大致相同的接口,但服用的Agent
实例作为第一个参数
from cStringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.client import FileBodyProducer
class GetPageProtocol(Protocol):
def __init__(self):
self.deferred = Deferred()
self.data = []
def dataReceived(self, data):
self.data.append(data)
def connectionLost(self, reason):
reason.trap(ResponseDone)
data = ''.join(self.data)
del self.data
self.deferred.callback(data)
def agentGetPage(agent, url,
method="GET",
headers=None,
postdata=None):
if postdata is not None:
bodyProducer = FileBodyProducer(StringIO(postdata))
else:
bodyProducer = None
def _getPageResponded(response):
if response.length != 0:
proto = GetPageProtocol()
response.deliverBody(proto)
return proto.deferred
else:
return None
d = agent.request(method, url, headers, bodyProducer)
d.addCallback(_getPageResponded)
return d
其中,在单元测试,看起来有点像:
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.client import Agent
fake_reactor = MemoryReactor()
agent = Agent(fake_reactor)
d = agentGetPage(agent, "http://example.com")
assert fake_reactor.tcpClients # or some such, exercise the code by manipulating the reactor
编辑:我最初想掠过这给ectomorph,少混淆了;但同时鼓励早期妥善处理反应堆也是一个不错的主意,并在以后避免不必要的痛苦。
谢谢!太棒了,那正是我想要的。 'twisted.internet.reactor'是我正在使用的反应器。 – user937284 2013-05-14 14:45:25
很好的答案,但不完全正确:)。 'LoopingCall'实际上会使用'self.clock',默认情况下它会初始化为'twisted.internet.reactor' * *。改变它的能力很重要,特别是对于测试。 (可悲的是,'getPage'实际上是硬编码到它,这就是为什么我们建议'twisted.web.client.Agent'现在不是。) – Glyph 2013-05-20 17:51:26
@Glyph:更新:有,我觉得包括使用反应堆的少suckishly 。 – SingleNegationElimination 2013-05-20 22:30:16
什么是延期信号灯甚至? – SingleNegationElimination 2013-05-14 11:09:06
用于限制同时执行的请求。 – user937284 2013-05-14 11:50:53