2012-03-01 47 views
1

我目前正在開展一個學校項目,其中的任務是建立一個線程化的服務器/客戶機系統。連接到系統時,系統中的每個客戶端都應該在服務器上分配自己的線程。另外,我希望服務器運行其他線程,一個涉及命令行輸入,另一個涉及向所有客戶端廣播消息。但是,我無法按照我的意願來運行。這似乎是線程阻塞對方。我希望我的程序能夠在服務器偵聽連接的客戶端的「同一時間」從命令行獲取輸入信息,依此類推。Python中的線程同步

我是新來的蟒蛇編程和多線程,allthough我認爲我的想法很好,我不驚訝我的代碼不起作用。事情是我不確定我將如何實現在不同線程之間傳遞的消息。我也不確定如何正確實施資源鎖定命令。我將在這裏發佈我的服務器文件和我的客戶端文件的代碼,我希望有人能幫助我。我認爲這應該是兩個相對簡單的腳本。我試圖在一定程度上儘可能好地評論我的代碼。

import select 
import socket 
import sys 
import threading 
import client 

class Server: 

#initializing server socket 
def __init__(self, event): 
    self.host = 'localhost' 
    self.port = 50000 
    self.backlog = 5 
    self.size = 1024 
    self.server = None 
    self.server_running = False 
    self.listen_threads = [] 
    self.local_threads = [] 
    self.clients = [] 
    self.serverSocketLock = None 
    self.cmdLock = None 
    #here i have also declared some events for the command line input 
    #and the receive function respectively, not sure if correct 
    self.cmd_event = event 
    self.socket_event = event 

def openSocket(self): 
    #binding server to port 
    try: 
     self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.server.bind((self.host, self.port)) 
     self.server.listen(5) 
     print "Listening to port " + str(self.port) + "..." 
    except socket.error, (value,message): 
     if self.server: 
      self.server.close() 
     print "Could not open socket: " + message 
     sys.exit(1) 

def run(self): 
    self.openSocket() 

    #making Rlocks for the socket and for the command line input 

    self.serverSocketLock = threading.RLock() 
    self.cmdLock = threading.RLock() 

    #set blocking to non-blocking 
    self.server.setblocking(0) 
    #making two threads always running on the server, 
    #one for the command line input, and one for broadcasting (sending) 
    cmd_thread = threading.Thread(target=self.server_cmd) 
    broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients]) 
    cmd_thread.daemon = True 
    broadcast_thread.daemon = True 
    #append the threads to thread list 
    self.local_threads.append(cmd_thread) 
    self.local_threads.append(broadcast_thread) 

    cmd_thread.start() 
    broadcast_thread.start() 


    self.server_running = True 
    while self.server_running: 

     #connecting to "knocking" clients 
     try: 
      c = client.Client(self.server.accept()) 
      self.clients.append(c) 
      print "Client " + str(c.address) + " connected" 

      #making a thread for each clientn and appending it to client list 
      listen_thread = threading.Thread(target=self.listenToClient,args=[c]) 
      self.listen_threads.append(listen_thread) 
      listen_thread.daemon = True 
      listen_thread.start() 
      #setting event "client has connected" 
      self.socket_event.set() 

     except socket.error, (value, message): 
      continue 

    #close threads 

    self.server.close() 
    print "Closing client threads" 
    for c in self.listen_threads: 
     c.join() 

def listenToClient(self, c): 

    while self.server_running: 

     #the idea here is to wait until the thread gets the message "client 
     #has connected" 
     self.socket_event.wait() 
     #then clear the event immidiately... 
     self.socket_event.clear() 
     #and aquire the socket resource 
     self.serverSocketLock.acquire() 

     #the below is the receive thingy 

     try: 
      recvd_data = c.client.recv(self.size) 
      if recvd_data == "" or recvd_data == "close\n": 
       print "Client " + str(c.address) + (" disconnected...") 
       self.socket_event.clear() 
       self.serverSocketLock.release() 
       return 

      print recvd_data 

      #I put these here to avoid locking the resource if no message 
      #has been received 
      self.socket_event.clear() 
      self.serverSocketLock.release() 
     except socket.error, (value, message): 
      continue    

def server_cmd(self): 

    #this is a simple command line utility 
    while self.server_running: 

     #got to have a smart way to make this work 

     self.cmd_event.wait() 
     self.cmd_event.clear() 
     self.cmdLock.acquire() 


     cmd = sys.stdin.readline() 
     if cmd == "": 
      continue 
     if cmd == "close\n": 
      print "Server shutting down..." 
      self.server_running = False 

     self.cmdLock.release() 


def broadcast(self, clients): 
    while self.server_running: 

     #this function will broadcast a message received from one 
     #client, to all other clients, but i guess any thread 
     #aspects applied to the above, will work here also 

     try: 
      send_data = sys.stdin.readline() 
      if send_data == "": 
       continue 
      else: 
       for c in clients: 
        c.client.send(send_data) 
      self.serverSocketLock.release() 
      self.cmdLock.release() 
     except socket.error, (value, message): 
      continue 

if __name__ == "__main__": 
e = threading.Event() 
s = Server(e) 
s.run() 

然後客戶端文件

import select 
import socket 
import sys 
import server 
import threading 

class Client(threading.Thread): 

#initializing client socket 

def __init__(self,(client,address)): 
    threading.Thread.__init__(self) 
    self.client = client 
    self.address = address 
    self.size = 1024 
    self.client_running = False 
    self.running_threads = [] 
    self.ClientSocketLock = None 

def run(self): 

    #connect to server 
    self.client.connect(('localhost',50000)) 

    #making a lock for the socket resource 
    self.clientSocketLock = threading.Lock() 
    self.client.setblocking(0) 
    self.client_running = True 

    #making two threads, one for receiving messages from server... 
    listen = threading.Thread(target=self.listenToServer) 

    #...and one for sending messages to server 
    speak = threading.Thread(target=self.speakToServer) 

    #not actually sure wat daemon means 
    listen.daemon = True 
    speak.daemon = True 

    #appending the threads to the thread-list 
    self.running_threads.append(listen) 
    self.running_threads.append(speak) 
    listen.start() 
    speak.start() 

    #this while-loop is just for avoiding the script terminating 
    while self.client_running: 
     dummy = 1 

    #closing the threads if the client goes down 
    print "Client operating on its own" 
    self.client.close() 

    #close threads 
    for t in self.running_threads: 
     t.join() 
    return 

#defining "listen"-function 
def listenToServer(self): 
    while self.client_running: 

     #here i acquire the socket to this function, but i realize I also 
     #should have a message passing wait()-function or something 
     #somewhere 
     self.clientSocketLock.acquire() 

     try: 
      data_recvd = self.client.recv(self.size) 
      print data_recvd 
     except socket.error, (value,message): 
      continue 

     #releasing the socket resource 
     self.clientSocketLock.release() 

#defining "speak"-function, doing much the same as for the above function  
def speakToServer(self): 
    while self.client_running: 
     self.clientSocketLock.acquire() 
     try: 
      send_data = sys.stdin.readline() 
      if send_data == "close\n": 
       print "Disconnecting..." 
       self.client_running = False 
      else: 
       self.client.send(send_data) 
     except socket.error, (value,message): 
      continue 

     self.clientSocketLock.release() 

if __name__ == "__main__": 
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost')) 
c.run() 

我意識到這是相當多的代碼行供您閱讀過,但正如我所說的,我覺得這個概念,並在它的腳本自應很容易理解。這將是非常大大appriciated如果有人可以幫助我在一個適當的方式同步我的主題=)

在此先感謝

---編輯---

確定。所以我現在簡化了我的代碼,只包含服務器和客戶端模塊中的發送和接收功能。連接到服務器的客戶端獲得自己的線程,並且兩個模塊中的發送和接收功能都在各自獨立的線程中進行操作。這就像一個魅力一樣,服務器模塊中的廣播功能回顯從一個客戶端到所有客戶端的字符串。到現在爲止還挺好!

我希望我的腳本能做的下一件事是在客戶端模塊中採用特定命令(即「關閉」)關閉客戶端,並加入線程列表中的所有正在運行的線程。我使用事件標誌來通知listenToServer和主線程speakToServer線程已經讀取輸入「關閉」。看起來主線程跳出while循環並啓動應該加入其他線程的for循環。但在這裏它掛起。看起來listenToServer線程中的while循環從不停止,即使在設置事件標誌時server_running應該設置爲False。

我只在這裏發佈客戶端模塊,因爲我猜想讓這兩個線程同步的答案將涉及同步客戶端和服務器模塊中的更多線程。

import select 
import socket 
import sys 
import server_bygg0203 
import threading 
from time import sleep 

class Client(threading.Thread): 

#initializing client socket 

def __init__(self,(client,address)): 

threading.Thread.__init__(self) 
self.client = client 
self.address = address 
self.size = 1024 
self.client_running = False 
self.running_threads = [] 
self.ClientSocketLock = None 
self.disconnected = threading.Event() 

def run(self): 

#connect to server 
self.client.connect(('localhost',50000)) 

#self.client.setblocking(0) 
self.client_running = True 

#making two threads, one for receiving messages from server... 
listen = threading.Thread(target=self.listenToServer) 

#...and one for sending messages to server 
speak = threading.Thread(target=self.speakToServer) 

#not actually sure what daemon means 
listen.daemon = True 
speak.daemon = True 

#appending the threads to the thread-list 
self.running_threads.append((listen,"listen")) 
self.running_threads.append((speak, "speak")) 
listen.start() 
speak.start() 

while self.client_running: 

    #check if event is set, and if it is 
    #set while statement to false 

    if self.disconnected.isSet(): 
     self.client_running = False 

#closing the threads if the client goes down 
print "Client operating on its own" 
self.client.shutdown(1) 
self.client.close() 

#close threads 

#the script hangs at the for-loop below, and 
#refuses to close the listen-thread (and possibly 
#also the speak thread, but it never gets that far) 

for t in self.running_threads: 
    print "Waiting for " + t[1] + " to close..." 
    t[0].join() 
self.disconnected.clear() 
return 

#defining "speak"-function  
def speakToServer(self): 

#sends strings to server 
while self.client_running: 
    try: 
     send_data = sys.stdin.readline() 
     self.client.send(send_data) 

     #I want the "close" command 
     #to set an event flag, which is being read by all other threads, 
     #and, at the same time set the while statement to false 

     if send_data == "close\n": 
      print "Disconnecting..." 
      self.disconnected.set() 
      self.client_running = False 
    except socket.error, (value,message): 
     continue 
return 

#defining "listen"-function 
def listenToServer(self): 

#receives strings from server 
while self.client_running: 

    #check if event is set, and if it is 
    #set while statement to false 

    if self.disconnected.isSet(): 
     self.client_running = False 
    try: 
     data_recvd = self.client.recv(self.size) 
     print data_recvd 
    except socket.error, (value,message): 
     continue 
return 

if __name__ == "__main__": 
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost')) 
c.run() 

後來,當我起來讓這臺服務器/客戶端系統和運行,我將使用這個系統,我們在這裏對實驗室的一些電梯機型,每個客戶端接收地面指令或「向上」和「下來「電話。服務器將運行分配算法並更新最適合請求訂單的客戶端上的電梯隊列。我意識到這是一條很長的路要走,但我想應該只是在一段時間=)

希望有人有時間來看看這個。提前致謝。

+2

我認爲你應該更清楚你的代碼是如何「不起作用」的。你期待什麼行爲?你觀察到了什麼行爲? 「不起作用」可能意味着任何事情。 – 2012-03-01 17:45:46

回答

1

我在這段代碼中看到的最大問題是,您有太多的事情要馬上進行,以便輕鬆調試您的問題。由於邏輯變得非線性,線程會變得非常複雜。特別是當你不得不擔心與鎖同步時。

您看到客戶端互相阻塞的原因是因爲您在服務器的listenToClient()循環中使用serverSocketLock的方式。說實話,這不完全是你的代碼的問題,但是當我開始調試它並將套接字轉換爲阻塞套接字時,它就成了問題。如果您將每個連接放入其自己的線程並從中讀取,那麼就沒有理由在這裏使用全局服務器鎖。他們都可以同時從自己的套接字讀取,這是線程的目的。

這是我推薦給大家:

  1. 擺脫你不需要的所有鎖和額外的線程,並從頭開始
  2. 有客戶因爲你做的連接,並像你一樣把它們放在他們的線程中。只需讓他們每秒發送一次數據。驗證您可以獲得多個客戶端連接和發送,並且您的服務器正在循環和接收。一旦你有這部分工作,你可以繼續下一部分。
  3. 現在你的套接字被設置爲非阻塞。當數據沒有準備好時,這導致它們全部旋轉得非常快。既然你是線程化的,你應該設置它們阻塞。然後讀者線程將簡單地坐下來等待數據並立即響應。

當線程訪問共享資源時使用鎖。顯然,任何時候線程都會嘗試修改服務器屬性,如列表或值。但是,當他們在自己的私人套接字上工作時並非如此。

您用來觸發您的讀者的事件在這裏似乎不是必要的。你已經收到了客戶端,然後你開始線程。所以它準備好了。

簡而言之...簡化,每次測試一個位。在工作時,添加更多。現在有太多的線程和鎖。

這是你listenToClient方法的一個簡單的例子:

def listenToClient(self, c): 
    while self.server_running: 
     try: 
      recvd_data = c.client.recv(self.size) 
      print "received:", c, recvd_data 
      if recvd_data == "" or recvd_data == "close\n": 
       print "Client " + str(c.address) + (" disconnected...") 
       return 

      print recvd_data 

     except socket.error, (value, message): 
      if value == 35: 
       continue 
      else: 
       print "Error:", value, message 
+0

好的!看起來像一些合理的建議。當我得到時間時,我會研究這一點。非常感謝您抽出寶貴的時間進行調試,當我啓動並運行新代碼時,我會發佈一個答案=) – 2012-03-01 19:38:46

+0

當然。如果您需要再次查看您的簡化代碼,請告訴我。 – jdi 2012-03-01 19:46:44

1

備份你的工作,然後折騰 - 部分。

您需要分段實施您的程序,並隨時測試每件作品。首先,解決您程序中的input部分問題。不要擔心如何廣播您收到的輸入內容。而是擔心您能夠成功並重復地通過套接字接收輸入。到現在爲止還挺好。

現在,我假設你想通過廣播到其他連接的客戶端來對此輸入作出反應。太糟糕了,你還不能這樣做!因爲,我在上面的段落中留下了一小部分細節。你必須設計一個PROTOCOL

什麼是協議?這是一套溝通規則。您的服務器如何知道客戶端何時完成發送數據?它是否由一些特殊字符終止?或者,也許您將消息的大小編碼爲消息的第一個或第二個字節。

這是一個很多工作,不是嗎? :-)

什麼是簡單的協議。 A line-oriented協議很簡單。一次讀取1個字符,直到您到達記錄終止符的末尾 - '\ n'。因此,客戶端會發送記錄這樣到你的服務器 -

HELO \ n 味精DAVE如果您的孩子\ n

因此,假設你有這種簡單的協議設計,實現它。現在,不要擔心多線程的信息!只是擔心它的工作。

你目前的協議是讀取1024個字節。這可能並不壞,只要確保從客戶端發送了1024字節的消息即可。

一旦你有協議的東西設置,繼續對輸入作出反應。但是現在你需要一些能夠讀取輸入的東西。一旦完成,我們可以擔心要做些什麼。

jdi是對的,你有太多的程序可以使用。件更容易修復。