蟒蛇扭曲:防止TCP读取器和网络资源之间的缓冲

蟒蛇扭曲:防止TCP读取器和网络资源之间的缓冲

问题描述:

我想实现一个使用扭曲网络的MJPEG服务器。其中 通过读取上游gstreamer进程获取其数据,该进程本身是 将MJPEG数据写入TCP端口localhost:9999。我有这样的事情 现在:蟒蛇扭曲:防止TCP读取器和网络资源之间的缓冲

from twisted.internet import reactor, protocol, defer 
from twisted.web import server, resource 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    @defer.inlineCallbacks 
    def deferredRenderer(self, request): 
     q = defer.DeferredQueue() 
     self.queues.append([q, request]) 
     while True: 
      yield q.get() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.deferredRenderer(request) 
     return server.NOT_DONE_YET 

class JpegStreamReader(protocol.Protocol): 
    def dataReceived(self, data): 
     for (q, req) in self.factory.queues: 
      req.write(data) 
      q.put('') 

root = File('web') 
root.putChild('stream.mjpeg', MJpegResource(queues)) 

factory = protocol.Factory() 
factory.protocol = JpegStreamReader 
factory.queues = queues 
reactor.listenTCP(9999, factory) 

site = server.Site(root) 
reactor.listenTCP(80, site) 

# spawn gstreamer process which writes to port 9999. 
# The gstream process is launched using: 
# gst-launch-1.0 -v \ 
#  v4l2src device=/dev/video0 \ 
#   ! video/x-raw,framerate=15/1, width=640, height=480 \ 
#   ! jpegenc \ 
#   ! multipartmux boundary=spionisto \ 
#   ! tcpclientsink host=127.0.0.1 port=9999 \ 

reactor.run() 

因此,像:

gstreamer --> JpegStreamReader --> MJpegResource 

这工作不错,但我发现,偶尔,在 浏览器的视频远远落后是什么“活着“(有时候多达30-40秒 )。只要刷新浏览器,MJPEG流就会跳回 以“正常”。所以我怀疑JpegStreamReader不能 写入对应于web.http.Request的TCP套接字的速度要快到 gstreamer正在填充TCP套接字9999,并且在JpegStreamReader的输入队列上东西正在缓存 。

由于流应该是“现场”,所以我可以将帧丢到 带回视频直播。然而,我不知道如何甚至检测到 JpegStreamReader落后于等?任何关于如何 使这个管道行为更像一个实时流的建议?

如果基本上有这样做的另一个架构,建议 将不胜感激。

您可以在Request对象上注册生产者。当Request的写入缓冲区已满时,它将调用pauseProducing方法。当房间变得可用时,它将有resumeProducing方法调用。

您可以使用此信息删除可能无法及时传送的帧。但是,您将不得不实际识别服务器中的帧(目前您只有一个将数据作为数据流传输的dataReceived方法,但不知道帧开始或结束的位置)。这也有一个问题,即缓冲区的完整性可能是流延迟的非常滞后的指标。如果系统中的瓶颈不在读取gstreamer的数据和写入请求之间,那么在这部分程序中加入背压灵敏度不会有帮助。

+0

非常感谢!我刚刚实施了你的建议,我认为它应该可以解决我的问题。我一直在玩视频流一段时间,当内部网络出现额外的网络活动时,我会看到预期的丢帧。 最终的解决方案还包括只响应resumeResroducing与一秒延迟。 为了后代的缘故,我想包括我终于想出的代码。大会是由我自己将其单独回答还是将其置于评论中。如果单独的答案我应该接受这个答案还是这个答案? –

这是实现Jean-Paul Calerone的 建议的最终解决方案。请注意,现在我们有一个JpegProducer类,它实现了PushProducer接口的 。当请求暂停时,它会设置一个标志。这个 使TCP流读取器(JpegStreamReader)不会将帧推送到 那个特定的生产者,如果它被堵塞的话。根据让 - 保罗的建议,I 也必须将多部分MJPEG流拆分为块,以便我们 总是丢弃帧而不破坏MJPEG输出格式。

from twisted.internet import reactor, protocol, defer, interfaces 
from twisted.web import server, resource 
from zope.interface import implementer 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    def setupProducer(self, request): 
     producer = JpegProducer(request) 
     request.notifyFinish().addErrback(self._responseFailed, producer) 
     request.registerProducer(producer, True) 

     self.queues.append(producer) 

    def _responseFailed(self, err, producer): 
     producer.stopProducing() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.setupProducer(request) 
     return server.NOT_DONE_YET 

@implementer(interfaces.IPushProducer) 
class JpegProducer(object): 
    def __init__(self, request): 
     self.request = request 
     self.isPaused = False 
     self.isStopped = False 
     self.delayedCall = None 

    def cancelCall(self): 
     if self.delayedCall: 
      self.delayedCall.cancel() 
      self.delayedCall = None 

    def pauseProducing(self): 
     self.isPaused = True 
     self.cancelCall() 

    def resetPausedFlag(self): 
     self.isPaused = False 
     self.delayedCall = None 

    def resumeProducing(self): 
     # calling self.cancelCall is defensive. We should not really get 
     # called with multiple resumeProducing calls without any 
     # pauseProducing in the middle. 
     self.cancelCall() 
     self.delayedCall = reactor.callLater(1, self.resetPausedFlag) 
     log('producer is requesting to be resumed') 

    def stopProducing(self): 
     self.isPaused = True 
     self.isStopped = True 
     log('producer is requesting to be stopped') 

MJPEG_SEP = '--spionisto\r\n' 

class JpegStreamReader(protocol.Protocol): 
    def __init__(self): 
     self.tnow = None 

    def connectionMade(self): 
     self.data = '' 
     self.tnow = datetime.now() 

    def dataReceived(self, data): 
     self.data += data 

     chunks = self.data.rsplit(MJPEG_SEP, 1) 

     dataToSend = '' 
     if len(chunks) == 2: 
      dataToSend = chunks[0] + MJPEG_SEP 

     self.data = chunks[-1] 

     for producer in self.factory.queues: 
      if (not producer.isPaused): 
       producer.request.write(dataToSend)