2013-06-29 39 views
0

我正在爲一個UDP應用程序編寫一個測試工具(TH),打開一個到另一個UDP應用程序實例的隧道。要通過隧道發送東西,每個UDP應用程序都有一個以讀寫模式(與將來使用的TUN接口)相關聯的命名管道。每個UDP應用程序都有幾個子進程,TH啓動UDP應用程序作爲不同線程中的子進程,並通過子進程中的管道發送數據。這是一團糟,但我希望首先在一個簡單的案例上工作,因爲我最終希望在5個以上的應用中構建隧道。python與IPC和子進程命名管道混亂

2個問題:

  1. 選擇的第一個應用程序()循環沒有得到全部由TH-只是第一個發送的消息。每次文件有新內容/準備好讀取時,select()都不會通知嗎?我現在使用pipe.readline(),更改爲pipe.readlines()可以獲得大部分消息,但不是全部。請參閱下面的更多細節。
  2. 在第二個應用程序中,我試圖寫入管道的UDP數據(與第一個不同)。 TH從未獲取數據。

這裏的數據通過隧道從TH流:

  • TH打印>> FIFO1, 「富」
  • FIFO1
  • UDP應用#1接收FIFO1,發送UDP數據包
  • UDP隧道
  • UDP應用程序#2接收UDP包,發送FIFO2
  • FIFO2
  • TH fifo2.readline()

當然,這根本不起作用。當我通過fifo1發送幾條消息到應用#1時,只有第一條消息進入應用#1。此消息通過UDP隧道正確傳輸到應用#2。 App#2寫入fifo2,但是在TH中,我看不到fifo2中的任何內容。當我運行測試工具來說明這一點時,我已經包含了控制檯輸出。

Sending data one way over tunnel... 
rxDataFifo: reading from pipe 
# Here we start sending messages (hello 0, hello 1, hello 2) via fifo1 
i= 0 
i= 1 
i= 2 
txDataFifo: finished # Now we are finished sending 
handleFifoData: hello 0 # first message received on UDP app #1 
UDP ('127.0.0.1', 9000): fde04a5a1de6d473b70c184e5e981279hello 0 # Here's the message app #2 recv via UDP 
...writing out to fifo! hello 0 # And app #2 write out to fifo2 (or should, at least) 
# Nothing received from fifo2 on test harness!?? 

問題1: 在該應用#1側,選擇循環不觸發每個管被寫入多個可讀事件。如果不是從管道中讀取單行,而是讀取所有可用的行,則會獲得所有最後兩條消息(但不是第一行)。

Sending data one way over tunnel... 
rxDataFifo: reading from pipe 
i= 0 
i= 1 
i= 2 
txDataFifo: finished 
handleFifoData: hello 1 
handleFifoData: hello 2 
UDP ('127.0.0.1', 9000): e3270c47214147ae3698aeecc13e2acehello 1 
tunnel data: hello 1 
...writing out to fifo! hello 1 
UDP ('127.0.0.1', 9000): e3270c47214147ae3698aeecc13e2acehello 2 
tunnel data: hello 2 
...writing out to fifo! hello 2 

問題2:我的理論是,TH是試圖讀取該管道有任何內容之前。但這沒有意義,因爲我在接收fifo2數據時做了一段時間的循環。我嘗試了一些更簡單的東西,沒有工作。


測試工具的

相關部分:

def main(): 
    # Create pipes for each process we want to launch 
    # NOTE: These pipes are passed to the multiprocessing.Process and subprocess.call 
    # that we use to launch the UDP apps 
    tmpdir = tempfile.mkdtemp() 
    pipe0 = os.path.join(tmpdir, 'pipe0') 
    pipe1 = os.path.join(tmpdir, 'pipe1') 
    try: 
     os.mkfifo(pipe0) 
     os.mkfifo(pipe1) 
    except OSError, e: 
     print "Failed to create FIFO: %s" % e 

    #... 

    queue = Queue() # this where output goes 
    num_msg = 10 # number of messages to send over tunnel 
    txFifo = Process(target=txDataFifo, args=(pipe0, queue, num_msg)) 
    rxFifo = Process(target=rxDataFifo, args=(pipe1, queue)) 
    rxFifo.start() 
    txFifo.start() 

def txDataFifo(pipe_name, queue, num_msg): 
    fifo = open(pipe_name, 'r+b') 
    # print >> fifo, "hello over named pipe" # write stuff to fifo 


    for i in range(0, 3): 
     print "i=",i 
     print >> fifo, "hello "+str(i) 

    fifo.close() 
    print "txDataFifo: finished" 

def rxDataFifo(pipe_name, queue): 
    print "rxDataFifo: reading from pipe" 
    fifo = open(pipe_name, 'w+b') 
    while True: 
     data = fifo.readline() 
     print 'rxDataFifo:', data 
     queue.put(data) 
    fifo.close() 
    print "txDataFifo: reading from pipe" 

UDP的應用程序和處理程序UDP的選擇環路和命名管道數據:

def listen (self, ipaddress, udpport, testport, pipe_filename): 
    # ... 
    inputs = [sock, self.pipe] # stuff we read 
    outputs = [] # stuff we expect to write 

    # Set up the named pipe that we use to simulate the TUN interface 
    # and use to communicate with the test harness 
    fifo = None 
    if pipe_filename: 
     print "Opening pipe="+pipe_filename+" for IPC with test harness" 
     fifo = open(pipe_filename, 'r+b') 
     inputs.append(fifo) 

    while inputs: 
     readable, writable, exceptional = select.select(inputs, outputs, inputs) 

     for event in readable: 
      if fifo and event is fifo: 
       # Handle data from test harness simulating TUN (via pipe) 
       self.handleFifoData(sock, fifo) 
      if event is sock: 
       # Handle tunnel/setup request data 
       self.handleUDPData(sock, fifo) 
      if event is self.pipe: 
       # Handle commands from the UI in the other process (IPC) 
       data = self.pipe.recv() 
       print "pipe event", data 
       if data[0] == 'open': 
        # NOTE: For open command, data[1] and data[2] are 
        # an IP address and port, respectively 
        connId = self.generateConnId() 
        msg = connId + 'setup' 
        sock.sendto(msg, (data[1], data[2])) 
        self.mySetup[connId] = SetupInfo(data[1], data[2]) 
     # Handle exceptional?  

def handleFifoData (self, sock, fifo,): 
    # Send data from sockTest to sock 
    data = fifo.readline().rstrip() 

    print "handleFifoData:", data 
    for peerId in self.tunnels: 
     # TODO: perhaps FIFO message should be parsed json, so we know which client to send them to? 
     peer = self.tunnels[peerId] 
     msg = peerId + data 
     sock.sendto(msg, (peer.address, peer.port)) 

def handleUDPData (self, sock, fifo): # reads a tuple 
    (data, addr) = sock.recvfrom(1024) 
    print "UDP "+str(addr)+": ", data 

    # ... 
    elif peerId in self.tunnels: # We are functioning as a relay for this node 
     # NOTE: This is where TUN interface forwarding would happen 
     print "tunnel data:", data 
     if fifo: 
      print "...writing out to fifo!", data 
      print >> fifo, data+'\n' 
     return 
    # ... 
+1

您可能看不到任何東西,因爲數據卡在stdio緩衝區中;嘗試在打印後添加'fifo.flush()'。一般來說,我會對緩衝IO和'select'混合保持警惕。我建議從'print'和'file.readline'切換到'os.write()'和'os.read()',它們不會進行緩衝,並且具有更多的可選行爲。 – user4815162342

+1

另外,'print'已經附加了一個換行符(如果缺少的話),所以你可能想要避免添加''\ n'',或者更好的是避免'print'優先於'sys.stdout.write )'或者,如上所述,'os.write()'。 – user4815162342

+0

將所有內容轉換爲os.open/read/write非常好,謝謝!我早些時候嘗試過'fifo.flush',但沒有奏效。我也嘗試過使用'python -u'啓動選項,也沒有工作 - 我的印象是強迫所有東西都無緩衝。 – nflacco

回答

1

你可能看不到任何東西因爲數據卡在stdio緩衝區中;打印後嘗試添加fifo.flush()。一般來說,應該小心混合緩衝IO和選擇。

這將是最好從printfile.readline()切換到os.write()os.read(),不緩衝區,並有更多的可預測的行爲WRT select()