我正在爲一個UDP應用程序編寫一個測試工具(TH),打開一個到另一個UDP應用程序實例的隧道。要通過隧道發送東西,每個UDP應用程序都有一個以讀寫模式(與將來使用的TUN接口)相關聯的命名管道。每個UDP應用程序都有幾個子進程,TH啓動UDP應用程序作爲不同線程中的子進程,並通過子進程中的管道發送數據。這是一團糟,但我希望首先在一個簡單的案例上工作,因爲我最終希望在5個以上的應用中構建隧道。python與IPC和子進程命名管道混亂
2個問題:
- 選擇的第一個應用程序()循環沒有得到全部由TH-只是第一個發送的消息。每次文件有新內容/準備好讀取時,select()都不會通知嗎?我現在使用pipe.readline(),更改爲pipe.readlines()可以獲得大部分消息,但不是全部。請參閱下面的更多細節。
- 在第二個應用程序中,我試圖寫入管道的UDP數據(與第一個不同)。 TH從未獲取數據。
這裏的數據通過隧道從TH流:
- TH打印>> FIFO1, 「富」
- FIFO1
- UDP應用#1接收FIFO1,發送UDP數據包
- UDP隧道
- UDP應用程序#2接收UDP包,發送FIFO2
- FIFO2
- TH fifo2.readline()
當然,這根本不起作用。當我通過fifo1發送幾條消息到應用#1時,只有第一條消息進入應用#1。此消息通過UDP隧道正確傳輸到應用#2。 App#2寫入fifo2,但是在TH中,我看不到fifo2中的任何內容。當我運行測試工具來說明這一點時,我已經包含了控制檯輸出。
Sending data one way over tunnel...
rxDataFifo: reading from pipe
# Here we start sending messages (hello 0, hello 1, hello 2) via fifo1
i= 0
i= 1
i= 2
txDataFifo: finished # Now we are finished sending
handleFifoData: hello 0 # first message received on UDP app #1
UDP ('127.0.0.1', 9000): fde04a5a1de6d473b70c184e5e981279hello 0 # Here's the message app #2 recv via UDP
...writing out to fifo! hello 0 # And app #2 write out to fifo2 (or should, at least)
# Nothing received from fifo2 on test harness!??
問題1: 在該應用#1側,選擇循環不觸發每個管被寫入多個可讀事件。如果不是從管道中讀取單行,而是讀取所有可用的行,則會獲得所有最後兩條消息(但不是第一行)。
Sending data one way over tunnel...
rxDataFifo: reading from pipe
i= 0
i= 1
i= 2
txDataFifo: finished
handleFifoData: hello 1
handleFifoData: hello 2
UDP ('127.0.0.1', 9000): e3270c47214147ae3698aeecc13e2acehello 1
tunnel data: hello 1
...writing out to fifo! hello 1
UDP ('127.0.0.1', 9000): e3270c47214147ae3698aeecc13e2acehello 2
tunnel data: hello 2
...writing out to fifo! hello 2
問題2:我的理論是,TH是試圖讀取該管道有任何內容之前。但這沒有意義,因爲我在接收fifo2數據時做了一段時間的循環。我嘗試了一些更簡單的東西,沒有工作。
測試工具的
相關部分:
def main():
# Create pipes for each process we want to launch
# NOTE: These pipes are passed to the multiprocessing.Process and subprocess.call
# that we use to launch the UDP apps
tmpdir = tempfile.mkdtemp()
pipe0 = os.path.join(tmpdir, 'pipe0')
pipe1 = os.path.join(tmpdir, 'pipe1')
try:
os.mkfifo(pipe0)
os.mkfifo(pipe1)
except OSError, e:
print "Failed to create FIFO: %s" % e
#...
queue = Queue() # this where output goes
num_msg = 10 # number of messages to send over tunnel
txFifo = Process(target=txDataFifo, args=(pipe0, queue, num_msg))
rxFifo = Process(target=rxDataFifo, args=(pipe1, queue))
rxFifo.start()
txFifo.start()
def txDataFifo(pipe_name, queue, num_msg):
fifo = open(pipe_name, 'r+b')
# print >> fifo, "hello over named pipe" # write stuff to fifo
for i in range(0, 3):
print "i=",i
print >> fifo, "hello "+str(i)
fifo.close()
print "txDataFifo: finished"
def rxDataFifo(pipe_name, queue):
print "rxDataFifo: reading from pipe"
fifo = open(pipe_name, 'w+b')
while True:
data = fifo.readline()
print 'rxDataFifo:', data
queue.put(data)
fifo.close()
print "txDataFifo: reading from pipe"
UDP的應用程序和處理程序UDP的選擇環路和命名管道數據:
def listen (self, ipaddress, udpport, testport, pipe_filename):
# ...
inputs = [sock, self.pipe] # stuff we read
outputs = [] # stuff we expect to write
# Set up the named pipe that we use to simulate the TUN interface
# and use to communicate with the test harness
fifo = None
if pipe_filename:
print "Opening pipe="+pipe_filename+" for IPC with test harness"
fifo = open(pipe_filename, 'r+b')
inputs.append(fifo)
while inputs:
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for event in readable:
if fifo and event is fifo:
# Handle data from test harness simulating TUN (via pipe)
self.handleFifoData(sock, fifo)
if event is sock:
# Handle tunnel/setup request data
self.handleUDPData(sock, fifo)
if event is self.pipe:
# Handle commands from the UI in the other process (IPC)
data = self.pipe.recv()
print "pipe event", data
if data[0] == 'open':
# NOTE: For open command, data[1] and data[2] are
# an IP address and port, respectively
connId = self.generateConnId()
msg = connId + 'setup'
sock.sendto(msg, (data[1], data[2]))
self.mySetup[connId] = SetupInfo(data[1], data[2])
# Handle exceptional?
def handleFifoData (self, sock, fifo,):
# Send data from sockTest to sock
data = fifo.readline().rstrip()
print "handleFifoData:", data
for peerId in self.tunnels:
# TODO: perhaps FIFO message should be parsed json, so we know which client to send them to?
peer = self.tunnels[peerId]
msg = peerId + data
sock.sendto(msg, (peer.address, peer.port))
def handleUDPData (self, sock, fifo): # reads a tuple
(data, addr) = sock.recvfrom(1024)
print "UDP "+str(addr)+": ", data
# ...
elif peerId in self.tunnels: # We are functioning as a relay for this node
# NOTE: This is where TUN interface forwarding would happen
print "tunnel data:", data
if fifo:
print "...writing out to fifo!", data
print >> fifo, data+'\n'
return
# ...
您可能看不到任何東西,因爲數據卡在stdio緩衝區中;嘗試在打印後添加'fifo.flush()'。一般來說,我會對緩衝IO和'select'混合保持警惕。我建議從'print'和'file.readline'切換到'os.write()'和'os.read()',它們不會進行緩衝,並且具有更多的可選行爲。 – user4815162342
另外,'print'已經附加了一個換行符(如果缺少的話),所以你可能想要避免添加''\ n'',或者更好的是避免'print'優先於'sys.stdout.write )'或者,如上所述,'os.write()'。 – user4815162342
將所有內容轉換爲os.open/read/write非常好,謝謝!我早些時候嘗試過'fifo.flush',但沒有奏效。我也嘗試過使用'python -u'啓動選項,也沒有工作 - 我的印象是強迫所有東西都無緩衝。 – nflacco