2017-03-03 78 views
2

我讀過thisthis。但是,我的情況是不同的。我不需要服務器上的多路複用服務,也不需要多個連接到服務器。

背景:
我的大數據項目,我需要計算一個給定的大數據的coreset。 Coreset是保留大數據最重要的數學關係的大數據的子集。

工作流程:Apache Thrift:多任務單服務器和客戶端

  • 片龐大的數據,以更小的塊
  • 客戶端解析塊並將其發送到服務器
  • 服務器計算coreset並保存結果

我的問題:
整個事情作爲單個執行線程工作。 客戶端解析一個塊,然後等待服務器完成計算核心集,然後解析另一個塊,等等。

目標:
利用多處理。客戶端同時分析多個塊,並且對於每個請求,服務器任務一個線程來處理它。線程數量有限的地方。就像一個池。

我知道我需要使用不同的協議,然後TSimpleServer和TThreadPoolServer或TThreadedServer。我不能讓我明白哪一個可以選擇,因爲它們都不適合我。

TThreadedServer爲每個客戶端連接生成一個新線程,並且每個線程保持活動狀態,直到客戶端連接關閉。


在TThreadedServer每個客戶端連接都有自己的專用服務器線程。在客戶端關閉連接以供重用之後,服務器線程返回到線程池。

我不需要每個連接的線程,我想要一個連接,並且服務器同時處理多個服務請求。 Visiualization:

Client: 
Thread1: parses(chunk1) --> Request compute coreset 
Thread2: parses(chunk2) --> Request compute coreset 
Thread3: parses(chunk3) --> Request compute coreset 

Server: (Pool of 2 threads) 
Thread1: Handle compute Coreset 
Thread2: handle compute Coreset 
. 
. 
Thread1 becomes available and handles another compute coreset 

代碼:
API。節儉:

struct CoresetPoint { 
    1: i32 row, 
    2: i32 dim, 
} 

struct CoresetAlgorithm { 
    1: string path, 
} 

struct CoresetWeightedPoint { 
    1: CoresetPoint point, 
    2: double weight, 
} 

struct CoresetPoints { 
    1: list<CoresetWeightedPoint> points, 
} 

service CoresetService { 

    void initialize(1:CoresetAlgorithm algorithm, 2:i32 coresetSize) 

    oneway void compressPoints(1:CoresetPoints message) 

    CoresetPoints getTotalCoreset() 
} 


服務器:(執行情況更好看移除)

class CoresetHandler: 
    def initialize(self, algorithm, coresetSize): 

    def _add(self, leveledSlice): 

    def compressPoints(self, message): 

    def getTotalCoreset(self): 


if __name__ == '__main__': 
    logging.basicConfig() 
    handler = CoresetHandler() 
    processor = CoresetService.Processor(handler) 
    transport = TSocket.TServerSocket(port=9090) 
    tfactory = TTransport.TBufferedTransportFactory() 
    pfactory = TBinaryProtocol.TBinaryProtocolFactory() 

    server = TServer.TThreadedServer(processor, transport, tfactory, pfactory) 

    # You could do one of these for a multithreaded server 
    # server = TServer.TThreadedServer(processor, transport, tfactory, pfactory) 
    # server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory) 

    print 'Starting the server...' 
    server.serve() 
    print 'done.' 


客戶:

try: 
    # Make socket 
    transport = TSocket.TSocket('localhost', 9090) 

    # Buffering is critical. Raw sockets are very slow 
    transport = TTransport.TBufferedTransport(transport) 

    # Wrap in a protocol 
    protocol = TBinaryProtocol.TBinaryProtocol(transport) 

    # Create a client to use the protocol encoder 
    client = CoresetService.Client(protocol) 

    # Connect! 
    transport.open() 


    // Here data is sliced, and in a loop I move on all files 
     Saved in the directory I specified, then they are parsed and 
     client.compressPoints(data) is invoked. 

     SliceFile(...) 
     p = CoresetAlgorithm(...) 
     client.initialize(p, 200) 
     for filename in os.listdir('/home/tony/DanLab/slicedFiles'): 
      if filename.endswith(".txt"): 
       data = _parse(filename) 
       client.compressPoints(data) 
     compressedData = client.getTotalCoreset() 


# Close! 
    transport.close() 

except Thrift.TException, tx: 
    print '%s' % (tx.message) 

問題: 在Thrift有可能嗎?我應該使用什麼協議? 我解決了客戶端等待服務器完成計算的部分問題,方法是在函數聲明 中加入oneway來表示客戶端只發出請求,根本不等待任何響應。

回答

1

從本質上講,這更像是一個架構問題,與其說是一個節儉問題。鑑於房屋

我不需要每個連接線程,我想要一個連接,並在同一時間處理多個服務請求的服務器。 Visiualization:

我加入單向到函數聲明解決了客戶的等待服務器來完成計算的部分問題,以表示該客戶端只發出請求,不等待任何響應在所有。

準確地描述使用的情況下,你想這樣:

+---------------------+ 
| Client    | 
+---------+-----------+ 
      | 
      | 
+---------v-----------+ 
| Server    | 
+---------+-----------+ 
      | 
      | 
+---------v-----------+   +---------------------+ 
| Queue<WorkItems> <----------+ Worker Thread Pool | 
+---------------------+   +---------------------+ 

服務器唯一的任務是讓請求,並儘快將其插入到工作項目排隊。這些工作項目由獨立的工作線程池處理,該工作線程池另外完全獨立於服務器部分。唯一的共享部分是工作項目隊列,這當然需要正確同步的訪問方法。

關於serevr的選擇:如果服務器足夠快地提供請求,即使是TSimpleServer也可以。

+0

好吧,你的方法確實比我的好。服務器對工作進行排隊,然後工人接受一個塊並進行計算。還有一個問題,如果我在客戶端多任務解析(這很耗時,所以我想多任務處理),我可能同一個客戶端的不同線程同時擁有'client.compressPoints(message)'。這是否容易出現問題? –

+0

通常,客戶端不是線程安全的,也不是基礎的物理連接處理。 IOW,每個線程一個客戶端。如果客戶端斷開連接,那麼您將需要足夠數量的服務器端點,因此'TSimpleSever'將不再適合這種情況。 – JensG