2014-10-10 56 views
1

這是我的第一篇文章!Python TCP有效載荷複製 - 同時將數據傳遞到多個端點

我的目標是複製單向TCP流的有效負載,並將此有效負載併發發送到多個端點。我有一個用Python編寫的工作原型,但是我對Python和Socket編程都很陌生。理想的解決方案可以在Windows和* nix環境中運行。

這個原型可以工作,但它爲每個緩衝區長度(當前設置爲4096字節)創建一個新的發送TCP連接。這樣做的主要問題是我最終會用盡本地端口發送,理想情況下,我希望數據從每個傳入的TCP流傳遞到單個TCP流(對於每個端點)。傳入數據可以從小於1024字節到數百兆字節不等。

此時,每4096個字節啓動一個新的傳出TCP流。我不確定問題出在我的線程實現上,還是我錯過了其他非常明顯的東西。

在我的研究中,我發現select()可以提供幫助,但是我不確定它是否合適,因爲我可能需要處理一些傳入數據並在將來對某些情況作出響應。

這是我迄今(有些我已經嘗試了代碼的變化被註釋掉)的代碼:我已研究過其他圖書館努力實現我的目標,包括使用

#!/usr/bin/python 
#One way TCP payload duplication 
import sys 
import threading 
from socket import * 
bufsize = 4096 
host= '' 

# Methods: 
#handles sending the data to the endpoints 
def send(endpoint,port,data): 
    sendSocket = socket(AF_INET, SOCK_STREAM) 
    #sendSocket.setblocking(1) 
    sendSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 
    #sendport = sendSocket.getsockname 
    #print sendport 
    try: 
     sendSocket.connect((endpoint, port)) 
     sendSocket.sendall(data) 
    except IOError as msg: 
     print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1] 
     sys.exit() 

#handles threading for sending data to endpoints 
def forward(service, ENDPOINT_LIST, port, data): 
    #for each endpoint in the endpoint list start a new send thread 
    for endpoint in ENDPOINT_LIST: 
     print "Forwarding data for %s from %s:%s to %s:%s" % (service,host,port,endpoint,port) 
     #send(endpoint,port,data) 
     ethread = threading.Thread(target=send, args=(endpoint,port,data)) 
     ethread.start() 

#handles threading for incoming clients 
def clientthread(conn,service,ENDPOINT_LIST,port): 
    while True: 
     #receive data form client 
     data = conn.recv(bufsize) 
     if not data: 
      break 
     cthread = threading.Thread(target=forward, args=(service, ENDPOINT_LIST, port, data)) 
     cthread.start() 
    #no data? then close the connection 
    conn.close() 

#handles listening to sockets for incoming connections 
def listen(service, ENDPOINT_LIST, port): 
    #create the socket 
    listenSocket = socket(AF_INET, SOCK_STREAM) 
    #Allow reusing addresses - I think this is important to stop local ports getting eaten up by never-ending tcp streams that don't close 
    listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 
    #try to bind the socket to host and port 
    try: 
     listenSocket.bind((host, port)) 
    #display an error message if you can't 
    except IOError as msg: 
     print "Bind Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1] 
     sys.exit() 
    #start listening on the socket 
    listenSocket.listen(10) 
    print "Service %s on port %s is listening" %(service,port) 
    while True: 
     #wait to accept a connection 
     conn, addr = listenSocket.accept() 
     print 'Connected to ' + addr[0] + ':' + str(addr[1]) + ' on port ' + str(port) 
     #start new thread for each connection 
     lthread = threading.Thread(target=clientthread , args=(conn,service,ENDPOINT_LIST,port)) 
     lthread.start() 
    #If no data close the connection 
    listenSocket.close() 

service = "Dumb-one-way-tcp-service-name1" 
ENDPOINT_LIST = ["192.168.1.100","192.168.1.200"] 
port = 55551  
listen(service,ENDPOINT_LIST,port) 

  • 扭曲
  • Asyncore
  • Scapy的

但是,我發現它們對於我的適度需求和編程技巧水平而言非常複雜。

如果任何人有任何建議,我可以改進我的方法或任何其他方式可以實現此目標,請讓我知道!

+0

有幾種解決方案更容易或更復雜,這取決於所需的同步類型。假設您將收到10MB並希望將其轉發到3個目的地。說目的地號碼3在接受其數據流時速度很慢。你想快速接收所有10MB的數據並立即將它發送到另外兩個目的地,並將它緩存到第三個內存中?或者是否可以(甚至更好)放慢整個事情,即以最慢的連接速度接收並傳遞10MB? – 2014-10-10 09:02:30

+0

@ArminRigo,我認爲同步化很重要。我確實想快速接受數據。如果端點具有容量,則轉發的流應該以它以相同的速度發送,但是如果端點比另一個端點慢(這對我來說不可能是問題),緩衝區會很好發送之前的有效載荷。我認爲這將是Python的情況,它會緩存RAM中的傳入負載,因爲它試圖發送它 - 或者我需要明確告訴它緩衝它的地方?你認爲在某個地方緩衝它可以幫助我解決許多傳出的問題嗎? – BeSure 2014-10-11 00:08:23

回答

0

總之,你的問題是沒有足夠的端口,對不對?看起來您發送後沒有關閉套接字。在send()試試這個:

... 
except IOError as msg: 
    print "Send Failed. Error Code: " + str(msg[0]) + ' Message: ' + msg[1] 
    sys.exit() 
finally: 
    sendSocket.close() 
+0

這可能是一個好主意,在最終情況下關閉連接,但它不會幫助停止每個緩衝區長度的新傳出TCP連接 – BeSure 2014-10-10 06:48:12

+0

@BeSure因此,您不希望每次都在新的時間發送流量connectin? – 2014-10-10 06:51:09

+0

不,我不想爲每個傳入的連接,我想將它的有效負載管道連接到每個端點的單個TCP連接。有效載荷可以變化達數百兆字節。 – BeSure 2014-10-10 07:13:22

0

有兩種方式,如果你不想學像扭曲的一些更高級的框架。

最接近你在做什麼:使用線程,但你需要有一個線程每個傳出連接---和而不是每個傳出數據包。創建3個Queue.Queue對象,並創建3個線程,傳遞給Queue對象之一和目標之一。每個線程打開一個套接字,然後在一個循環中,它從它自己的Queue獲得下一個字符串並將其發送到套接字。 clientthread(可以只是主線程,先驗)以字符串形式接收數據,並將每個字符串放入所有隊列中。這樣,發送的數據包就不會出現亂序,因爲如果您爲每個數據包創建一個線程,就可能發生這種情況。

另一種方法是完全避免線程,並使用select()。這是更多的思想彎曲。基本上你只有一個以select()開頭的大循環。它需要精心管理,插座的正確列表傳遞給select():你要調用select()醒來要麼時,有從入站插座輸入數據,如果出站插槽既可以發送更多還有更多要發送的東西。在這個模型中,你會有3個字符串列表;當你讀取傳入的數據時,你將它附加到所有三個列表中; select()調用被傳遞到具有非空列表的出站套接字列表(因此,要發送更多);並且在發送時,您不得在此模型中使用sendall(),但在send()中使用sendall(),並且如果發送的字符串少於完整字符串,則必須將其餘部分重新添加到相應列表的開頭。

+0

我認爲Queue是我想採取的方法,我試圖實現您的建議,並且我有Queue的工作,但是我不確定如何構造線程,但我仍然創建了太多的線程,導致失敗文件,我無法弄清楚爲什麼... – BeSure 2014-10-17 08:31:32

相關問題