2017-09-05 78 views
1

我想實現一個使用扭曲網絡的MJPEG服務器。其中 通過讀取上游gstreamer進程獲取其數據,該進程本身是 將MJPEG數據寫入TCP端口localhost:9999。我有這樣的事情 現在:蟒蛇扭曲:防止TCP讀取器和網絡資源之間的緩衝

from twisted.internet import reactor, protocol, defer 
from twisted.web import server, resource 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    @defer.inlineCallbacks 
    def deferredRenderer(self, request): 
     q = defer.DeferredQueue() 
     self.queues.append([q, request]) 
     while True: 
      yield q.get() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.deferredRenderer(request) 
     return server.NOT_DONE_YET 

class JpegStreamReader(protocol.Protocol): 
    def dataReceived(self, data): 
     for (q, req) in self.factory.queues: 
      req.write(data) 
      q.put('') 

root = File('web') 
root.putChild('stream.mjpeg', MJpegResource(queues)) 

factory = protocol.Factory() 
factory.protocol = JpegStreamReader 
factory.queues = queues 
reactor.listenTCP(9999, factory) 

site = server.Site(root) 
reactor.listenTCP(80, site) 

# spawn gstreamer process which writes to port 9999. 
# The gstream process is launched using: 
# gst-launch-1.0 -v \ 
#  v4l2src device=/dev/video0 \ 
#   ! video/x-raw,framerate=15/1, width=640, height=480 \ 
#   ! jpegenc \ 
#   ! multipartmux boundary=spionisto \ 
#   ! tcpclientsink host=127.0.0.1 port=9999 \ 

reactor.run() 

因此,像:

gstreamer --> JpegStreamReader --> MJpegResource 

這工作不錯,但我發現,偶爾,在 瀏覽器的視頻遠遠落後是什麼「活着「(有時候多達30-40秒 )。只要刷新瀏覽器,MJPEG流就會跳回 以「正常」。所以我懷疑JpegStreamReader不能 寫入對應於web.http.Request的TCP套接字的速度要快到 gstreamer正在填充TCP套接字9999,並且在JpegStreamReader的輸入隊列上東西正在緩存 。

由於流應該是「現場」,所以我可以將幀丟到 帶回視頻直播。然而,我不知道如何甚至檢測到 JpegStreamReader落後於等?任何關於如何 使這個管道行爲更像一個實時流的建議?

如果基本上有這樣做的另一個架構,建議 將不勝感激。

回答

1

您可以在Request對象上註冊生產者。當Request的寫入緩衝區已滿時,它將調用pauseProducing方法。當房間變得可用時,它將有resumeProducing方法調用。

您可以使用此信息刪除可能無法及時傳送的幀。但是,您將不得不實際識別服務器中的幀(目前您只有一個將數據作爲數據流傳輸的dataReceived方法,但不知道幀開始或結束的位置)。這也有一個問題,即緩衝區的完整性可能是流延遲的非常滯後的指標。如果系統中的瓶頸不在讀取gstreamer的數據和寫入請求之間,那麼在這部分程序中加入背壓靈敏度不會有幫助。

+0

非常感謝!我剛剛實施了你的建議,我認爲它應該可以解決我的問題。我一直在玩視頻流一段時間,當內部網絡出現額外的網絡活動時,我會看到預期的丟幀。 最終的解決方案還包括只響應resumeResroducing與一秒延遲。 爲了後代的緣故,我想包括我終於想出的代碼。大會是由我自己將其單獨回答還是將其置於評論中。如果單獨的答案我應該接受這個答案還是這個答案? –

2

這是實現Jean-Paul Calerone的 建議的最終解決方案。請注意,現在我們有一個JpegProducer類,它實現了PushProducer接口的 。當請求暫停時,它會設置一個標誌。這個 使TCP流讀取器(JpegStreamReader)不會將幀推送到 那個特定的生產者,如果它被堵塞的話。根據讓 - 保羅的建議,I 也必須將多部分MJPEG流拆分爲塊,以便我們 總是丟棄幀而不破壞MJPEG輸出格式。

from twisted.internet import reactor, protocol, defer, interfaces 
from twisted.web import server, resource 
from zope.interface import implementer 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    def setupProducer(self, request): 
     producer = JpegProducer(request) 
     request.notifyFinish().addErrback(self._responseFailed, producer) 
     request.registerProducer(producer, True) 

     self.queues.append(producer) 

    def _responseFailed(self, err, producer): 
     producer.stopProducing() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.setupProducer(request) 
     return server.NOT_DONE_YET 

@implementer(interfaces.IPushProducer) 
class JpegProducer(object): 
    def __init__(self, request): 
     self.request = request 
     self.isPaused = False 
     self.isStopped = False 
     self.delayedCall = None 

    def cancelCall(self): 
     if self.delayedCall: 
      self.delayedCall.cancel() 
      self.delayedCall = None 

    def pauseProducing(self): 
     self.isPaused = True 
     self.cancelCall() 

    def resetPausedFlag(self): 
     self.isPaused = False 
     self.delayedCall = None 

    def resumeProducing(self): 
     # calling self.cancelCall is defensive. We should not really get 
     # called with multiple resumeProducing calls without any 
     # pauseProducing in the middle. 
     self.cancelCall() 
     self.delayedCall = reactor.callLater(1, self.resetPausedFlag) 
     log('producer is requesting to be resumed') 

    def stopProducing(self): 
     self.isPaused = True 
     self.isStopped = True 
     log('producer is requesting to be stopped') 

MJPEG_SEP = '--spionisto\r\n' 

class JpegStreamReader(protocol.Protocol): 
    def __init__(self): 
     self.tnow = None 

    def connectionMade(self): 
     self.data = '' 
     self.tnow = datetime.now() 

    def dataReceived(self, data): 
     self.data += data 

     chunks = self.data.rsplit(MJPEG_SEP, 1) 

     dataToSend = '' 
     if len(chunks) == 2: 
      dataToSend = chunks[0] + MJPEG_SEP 

     self.data = chunks[-1] 

     for producer in self.factory.queues: 
      if (not producer.isPaused): 
       producer.request.write(dataToSend)