2017-09-14 48 views
0

我正在嘗試使用Twisted with LineReceiver協議通過網絡發送文件。我看到的問題是,當我讀取二進制文件並嘗試發送塊時,他們根本不發送。Python通過網絡扭曲發送大文件

我讀使用file:

import json 
import time 
import threading 
from twisted.internet import reactor, threads 
from twisted.protocols.basic import LineReceiver 
from twisted.internet import protocol 

MaximumMsgSize = 15500 

trySend = True 
connectionToServer = None 

class ClientInterfaceFactory(protocol.Factory): 

    def buildProtocol(self, addr): 
     return WoosterInterfaceProtocol(self._msgProcessor, self._logger) 

class ClientInterfaceProtocol(LineReceiver): 

    def connectionMade(self): 
     connectionToServer = self 

    def _DecodeMessage(self, rawMsg): 
     header, body = json.loads(rawMsg) 
     return (header, json.loads(body)) 

    def ProcessIncomingMsg(self, rawMsg, connObject): 
     # Decode raw message. 
     decodedMsg = self._DecodeMessage(rawMsg) 

     self.ProccessTransmitJobToNode(decodedMsg, connObject) 

    def _BuildMessage(self, id, msgBody = {}): 
     msgs = [] 

     fullMsgBody = json.dumps(msgBody) 
     msgBodyLength = len(fullMsgBody) 

     totalParts = 1 if msgBodyLength <= MaximumMsgSize else \ 
      int(math.ceil(msgBodyLength/MaximumMsgSize)) 

     startPoint = 0 
     msgBodyPos = 0 

     for partNo in range(totalParts): 
      msgBodyPos = (partNo + 1) * MaximumMsgSize 

      header = {'ID' : id, 'MsgParts' : totalParts, 
       'MsgPart' : partNo } 
      msg = (header, fullMsgBody[startPoint:msgBodyPos]) 
      jsonMsg = json.dumps(msg)  

      msgs.append(jsonMsg) 
      startPoint = msgBodyPos 

     return (msgs, '') 

    def ProccessTransmitJobToNode(self, msg, connection): 
     rootDir = '../documentation/configs/Wooster' 

     exportedFiles = ['consoleLog.txt', 'blob.dat'] 
     params = { 
      'Status' : 'buildStatus', 
      'TaskID' : 'taskID', 
      'Name' : 'taskName', 
      'Exports' : len(exportedFiles), 
      } 
     msg, statusStr = self._BuildMessage(101, params) 
     connection.sendLine(msg[0]) 

     for filename in exportedFiles: 
      with open (filename, "rb") as exportFileHandle: 
       data = exportFileHandle.read().encode('base64') 

      params = { 
       ExportFileToMaster_Tag.TaskID : taskID, 
       ExportFileToMaster_Tag.FileContents : data, 
       ExportFileToMaster_Tag.Filename : filename 
      } 
      msgs, _ = self._BuildMessage(MsgID.ExportFileToMaster, params)   
      for m in msgs: 
       connection.sendLine(m) 

    def lineReceived(self, data): 
     threads.deferToThread(self.ProcessIncomingMsg, data, self) 


def ConnectFailed(reason): 
    print 'Connection failed..' 
    reactor.callLater(20, reactor.callFromThread, ConnectToServer) 

def ConnectToServer(): 
    print 'Connecting...' 
    from twisted.internet.endpoints import TCP4ClientEndpoint 
    endpoint = TCP4ClientEndpoint(reactor, 'localhost', 8181) 

    deferItem = endpoint.connect(factory) 
    deferItem.addErrback(ConnectFailed) 

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False}) 
netThread.start() 

reactor.callFromThread(ConnectToServer) 

factory = ClientInterfaceFactory() 
protocol = ClientInterfaceProtocol() 

while 1: 
    time.sleep(0.01) 

    if connectionToServer == None: continue 

    if trySend == True: 
     protocol.ProccessTransmitJobToNode(None, None) 
     trySend = False 

有什麼我做錯了文件發送,當寫爲多部分或者有一個以上的文件中奮鬥着它的?

如果發生單個寫入,則m 注意:我已經用一段粗略的示例代碼更新了問題,希望它有意義。

回答

1

_BuildMessage返回一個二元組:(msgs, '')

您的網絡代碼遍歷此:

msgs = self._BuildMessage(MsgID.ExportFileToMaster, params) 

for m in msgs: 

所以,你的網絡代碼首先試圖發送JSON編碼數據的列表,然後試圖發送空字符串。它很可能會引發例外,因爲您無法使用sendLine發送任何列表。如果您沒有看到異常,則忘記啓用日誌記錄。您應始終啓用日誌記錄,以便您可以看到發生的任何異常情況。

此外,您正在使用time.sleep,你不應該在基於Twisted的程序中這樣做。如果你這樣做是爲了避免接收器過載,你應該使用TCP的本地反壓,而不是註冊一個可以接收暫停和恢復通知的生產者。無論如何,time.sleep(以及對所有數據的循環)將阻止整個反應器線程並阻止任何進度。結果是大部分數據在發送之前會在本地進行緩衝。

此外,您的代碼從非反應器線程調用LineReceiver.sendLine。這有未定義的結果,但你可以指望它不工作。

這個循環運行在主線程:

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False}) 
netThread.start() 

ProcessTransmitJobToNode簡單地調用self.sendLine

def ProccessTransmitJobToNode(self, msg, connection): 
    rootDir = '../documentation/configs/Wooster' 

    exportedFiles = ['consoleLog.txt', 'blob.dat'] 
    params = { 
     'Status' : 'buildStatus', 
     'TaskID' : 'taskID', 
     'Name' : 'taskName', 
     'Exports' : len(exportedFiles), 
     } 
    msg, statusStr = self._BuildMessage(101, params) 
    connection.sendLine(msg[0]) 

你或許應該刪除

while 1: 
    time.sleep(0.01) 

    if connectionToServer == None: continue 

    if trySend == True: 
     protocol.ProccessTransmitJobToNode(None, None) 
     trySend = False 

,同時反應器在另一個線程中運行完全從應用程序中使用線程上。使用reactor.callLater可以更好地管理基於時間的事件(您的主線程循環可以有效地生成一次每秒撥打ProcessTransmitJobToNode的呼叫(trySend標誌的模數效果))。

您可能還想看看https://github.com/twisted/tubes作爲使用Twisted管理大量數據的更好方法。

+0

正在發送的消息不是一個列表,它是一個Json dumps()輸出。我返回一個消息列表,但爲了循環瀏覽它們 –

+0

對不起,它應該是消息,_ = self._Build ........ –

+0

是的,是的,它應該。 –