我讀過this和this。但是,我的情況是不同的。我不需要服務器上的多路複用服務,也不需要多個連接到服務器。
背景:
我的大數據項目,我需要計算一個給定的大數據的coreset。 Coreset是保留大數據最重要的數學關係的大數據的子集。
工作流程:Apache Thrift:多任務單服務器和客戶端
- 片龐大的數據,以更小的塊
- 客戶端解析塊並將其發送到服務器
- 服務器計算coreset並保存結果
我的問題:
整個事情作爲單個執行線程工作。 客戶端解析一個塊,然後等待服務器完成計算核心集,然後解析另一個塊,等等。
目標:
利用多處理。客戶端同時分析多個塊,並且對於每個請求,服務器任務一個線程來處理它。線程數量有限的地方。就像一個池。
我知道我需要使用不同的協議,然後TSimpleServer和TThreadPoolServer或TThreadedServer。我不能讓我明白哪一個可以選擇,因爲它們都不適合我。
TThreadedServer爲每個客戶端連接生成一個新線程,並且每個線程保持活動狀態,直到客戶端連接關閉。
在TThreadedServer每個客戶端連接都有自己的專用服務器線程。在客戶端關閉連接以供重用之後,服務器線程返回到線程池。
我不需要每個連接的線程,我想要一個連接,並且服務器同時處理多個服務請求。 Visiualization:
Client:
Thread1: parses(chunk1) --> Request compute coreset
Thread2: parses(chunk2) --> Request compute coreset
Thread3: parses(chunk3) --> Request compute coreset
Server: (Pool of 2 threads)
Thread1: Handle compute Coreset
Thread2: handle compute Coreset
.
.
Thread1 becomes available and handles another compute coreset
代碼:
API。節儉:
struct CoresetPoint {
1: i32 row,
2: i32 dim,
}
struct CoresetAlgorithm {
1: string path,
}
struct CoresetWeightedPoint {
1: CoresetPoint point,
2: double weight,
}
struct CoresetPoints {
1: list<CoresetWeightedPoint> points,
}
service CoresetService {
void initialize(1:CoresetAlgorithm algorithm, 2:i32 coresetSize)
oneway void compressPoints(1:CoresetPoints message)
CoresetPoints getTotalCoreset()
}
服務器:(執行情況更好看移除)
class CoresetHandler:
def initialize(self, algorithm, coresetSize):
def _add(self, leveledSlice):
def compressPoints(self, message):
def getTotalCoreset(self):
if __name__ == '__main__':
logging.basicConfig()
handler = CoresetHandler()
processor = CoresetService.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
# You could do one of these for a multithreaded server
# server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
# server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()
print 'done.'
客戶:
try:
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = CoresetService.Client(protocol)
# Connect!
transport.open()
// Here data is sliced, and in a loop I move on all files
Saved in the directory I specified, then they are parsed and
client.compressPoints(data) is invoked.
SliceFile(...)
p = CoresetAlgorithm(...)
client.initialize(p, 200)
for filename in os.listdir('/home/tony/DanLab/slicedFiles'):
if filename.endswith(".txt"):
data = _parse(filename)
client.compressPoints(data)
compressedData = client.getTotalCoreset()
# Close!
transport.close()
except Thrift.TException, tx:
print '%s' % (tx.message)
問題: 在Thrift有可能嗎?我應該使用什麼協議? 我解決了客戶端等待服務器完成計算的部分問題,方法是在函數聲明 中加入oneway
來表示客戶端只發出請求,根本不等待任何響應。
好吧,你的方法確實比我的好。服務器對工作進行排隊,然後工人接受一個塊並進行計算。還有一個問題,如果我在客戶端多任務解析(這很耗時,所以我想多任務處理),我可能同一個客戶端的不同線程同時擁有'client.compressPoints(message)'。這是否容易出現問題? –
通常,客戶端不是線程安全的,也不是基礎的物理連接處理。 IOW,每個線程一個客戶端。如果客戶端斷開連接,那麼您將需要足夠數量的服務器端點,因此'TSimpleSever'將不再適合這種情況。 – JensG