我目前正在開展一個學校項目,其中的任務是建立一個線程化的服務器/客戶機系統。連接到系統時,系統中的每個客戶端都應該在服務器上分配自己的線程。另外,我希望服務器運行其他線程,一個涉及命令行輸入,另一個涉及向所有客戶端廣播消息。但是,我無法按照我的意願來運行。這似乎是線程阻塞對方。我希望我的程序能夠在服務器偵聽連接的客戶端的「同一時間」從命令行獲取輸入信息,依此類推。Python中的線程同步
我是新來的蟒蛇編程和多線程,allthough我認爲我的想法很好,我不驚訝我的代碼不起作用。事情是我不確定我將如何實現在不同線程之間傳遞的消息。我也不確定如何正確實施資源鎖定命令。我將在這裏發佈我的服務器文件和我的客戶端文件的代碼,我希望有人能幫助我。我認爲這應該是兩個相對簡單的腳本。我試圖在一定程度上儘可能好地評論我的代碼。
import select
import socket
import sys
import threading
import client
class Server:
#initializing server socket
def __init__(self, event):
self.host = 'localhost'
self.port = 50000
self.backlog = 5
self.size = 1024
self.server = None
self.server_running = False
self.listen_threads = []
self.local_threads = []
self.clients = []
self.serverSocketLock = None
self.cmdLock = None
#here i have also declared some events for the command line input
#and the receive function respectively, not sure if correct
self.cmd_event = event
self.socket_event = event
def openSocket(self):
#binding server to port
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((self.host, self.port))
self.server.listen(5)
print "Listening to port " + str(self.port) + "..."
except socket.error, (value,message):
if self.server:
self.server.close()
print "Could not open socket: " + message
sys.exit(1)
def run(self):
self.openSocket()
#making Rlocks for the socket and for the command line input
self.serverSocketLock = threading.RLock()
self.cmdLock = threading.RLock()
#set blocking to non-blocking
self.server.setblocking(0)
#making two threads always running on the server,
#one for the command line input, and one for broadcasting (sending)
cmd_thread = threading.Thread(target=self.server_cmd)
broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients])
cmd_thread.daemon = True
broadcast_thread.daemon = True
#append the threads to thread list
self.local_threads.append(cmd_thread)
self.local_threads.append(broadcast_thread)
cmd_thread.start()
broadcast_thread.start()
self.server_running = True
while self.server_running:
#connecting to "knocking" clients
try:
c = client.Client(self.server.accept())
self.clients.append(c)
print "Client " + str(c.address) + " connected"
#making a thread for each clientn and appending it to client list
listen_thread = threading.Thread(target=self.listenToClient,args=[c])
self.listen_threads.append(listen_thread)
listen_thread.daemon = True
listen_thread.start()
#setting event "client has connected"
self.socket_event.set()
except socket.error, (value, message):
continue
#close threads
self.server.close()
print "Closing client threads"
for c in self.listen_threads:
c.join()
def listenToClient(self, c):
while self.server_running:
#the idea here is to wait until the thread gets the message "client
#has connected"
self.socket_event.wait()
#then clear the event immidiately...
self.socket_event.clear()
#and aquire the socket resource
self.serverSocketLock.acquire()
#the below is the receive thingy
try:
recvd_data = c.client.recv(self.size)
if recvd_data == "" or recvd_data == "close\n":
print "Client " + str(c.address) + (" disconnected...")
self.socket_event.clear()
self.serverSocketLock.release()
return
print recvd_data
#I put these here to avoid locking the resource if no message
#has been received
self.socket_event.clear()
self.serverSocketLock.release()
except socket.error, (value, message):
continue
def server_cmd(self):
#this is a simple command line utility
while self.server_running:
#got to have a smart way to make this work
self.cmd_event.wait()
self.cmd_event.clear()
self.cmdLock.acquire()
cmd = sys.stdin.readline()
if cmd == "":
continue
if cmd == "close\n":
print "Server shutting down..."
self.server_running = False
self.cmdLock.release()
def broadcast(self, clients):
while self.server_running:
#this function will broadcast a message received from one
#client, to all other clients, but i guess any thread
#aspects applied to the above, will work here also
try:
send_data = sys.stdin.readline()
if send_data == "":
continue
else:
for c in clients:
c.client.send(send_data)
self.serverSocketLock.release()
self.cmdLock.release()
except socket.error, (value, message):
continue
if __name__ == "__main__":
e = threading.Event()
s = Server(e)
s.run()
然後客戶端文件
import select
import socket
import sys
import server
import threading
class Client(threading.Thread):
#initializing client socket
def __init__(self,(client,address)):
threading.Thread.__init__(self)
self.client = client
self.address = address
self.size = 1024
self.client_running = False
self.running_threads = []
self.ClientSocketLock = None
def run(self):
#connect to server
self.client.connect(('localhost',50000))
#making a lock for the socket resource
self.clientSocketLock = threading.Lock()
self.client.setblocking(0)
self.client_running = True
#making two threads, one for receiving messages from server...
listen = threading.Thread(target=self.listenToServer)
#...and one for sending messages to server
speak = threading.Thread(target=self.speakToServer)
#not actually sure wat daemon means
listen.daemon = True
speak.daemon = True
#appending the threads to the thread-list
self.running_threads.append(listen)
self.running_threads.append(speak)
listen.start()
speak.start()
#this while-loop is just for avoiding the script terminating
while self.client_running:
dummy = 1
#closing the threads if the client goes down
print "Client operating on its own"
self.client.close()
#close threads
for t in self.running_threads:
t.join()
return
#defining "listen"-function
def listenToServer(self):
while self.client_running:
#here i acquire the socket to this function, but i realize I also
#should have a message passing wait()-function or something
#somewhere
self.clientSocketLock.acquire()
try:
data_recvd = self.client.recv(self.size)
print data_recvd
except socket.error, (value,message):
continue
#releasing the socket resource
self.clientSocketLock.release()
#defining "speak"-function, doing much the same as for the above function
def speakToServer(self):
while self.client_running:
self.clientSocketLock.acquire()
try:
send_data = sys.stdin.readline()
if send_data == "close\n":
print "Disconnecting..."
self.client_running = False
else:
self.client.send(send_data)
except socket.error, (value,message):
continue
self.clientSocketLock.release()
if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()
我意識到這是相當多的代碼行供您閱讀過,但正如我所說的,我覺得這個概念,並在它的腳本自應很容易理解。這將是非常大大appriciated如果有人可以幫助我在一個適當的方式同步我的主題=)
在此先感謝
---編輯---
確定。所以我現在簡化了我的代碼,只包含服務器和客戶端模塊中的發送和接收功能。連接到服務器的客戶端獲得自己的線程,並且兩個模塊中的發送和接收功能都在各自獨立的線程中進行操作。這就像一個魅力一樣,服務器模塊中的廣播功能回顯從一個客戶端到所有客戶端的字符串。到現在爲止還挺好!
我希望我的腳本能做的下一件事是在客戶端模塊中採用特定命令(即「關閉」)關閉客戶端,並加入線程列表中的所有正在運行的線程。我使用事件標誌來通知listenToServer和主線程speakToServer線程已經讀取輸入「關閉」。看起來主線程跳出while循環並啓動應該加入其他線程的for循環。但在這裏它掛起。看起來listenToServer線程中的while循環從不停止,即使在設置事件標誌時server_running應該設置爲False。
我只在這裏發佈客戶端模塊,因爲我猜想讓這兩個線程同步的答案將涉及同步客戶端和服務器模塊中的更多線程。
import select
import socket
import sys
import server_bygg0203
import threading
from time import sleep
class Client(threading.Thread):
#initializing client socket
def __init__(self,(client,address)):
threading.Thread.__init__(self)
self.client = client
self.address = address
self.size = 1024
self.client_running = False
self.running_threads = []
self.ClientSocketLock = None
self.disconnected = threading.Event()
def run(self):
#connect to server
self.client.connect(('localhost',50000))
#self.client.setblocking(0)
self.client_running = True
#making two threads, one for receiving messages from server...
listen = threading.Thread(target=self.listenToServer)
#...and one for sending messages to server
speak = threading.Thread(target=self.speakToServer)
#not actually sure what daemon means
listen.daemon = True
speak.daemon = True
#appending the threads to the thread-list
self.running_threads.append((listen,"listen"))
self.running_threads.append((speak, "speak"))
listen.start()
speak.start()
while self.client_running:
#check if event is set, and if it is
#set while statement to false
if self.disconnected.isSet():
self.client_running = False
#closing the threads if the client goes down
print "Client operating on its own"
self.client.shutdown(1)
self.client.close()
#close threads
#the script hangs at the for-loop below, and
#refuses to close the listen-thread (and possibly
#also the speak thread, but it never gets that far)
for t in self.running_threads:
print "Waiting for " + t[1] + " to close..."
t[0].join()
self.disconnected.clear()
return
#defining "speak"-function
def speakToServer(self):
#sends strings to server
while self.client_running:
try:
send_data = sys.stdin.readline()
self.client.send(send_data)
#I want the "close" command
#to set an event flag, which is being read by all other threads,
#and, at the same time set the while statement to false
if send_data == "close\n":
print "Disconnecting..."
self.disconnected.set()
self.client_running = False
except socket.error, (value,message):
continue
return
#defining "listen"-function
def listenToServer(self):
#receives strings from server
while self.client_running:
#check if event is set, and if it is
#set while statement to false
if self.disconnected.isSet():
self.client_running = False
try:
data_recvd = self.client.recv(self.size)
print data_recvd
except socket.error, (value,message):
continue
return
if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()
後來,當我起來讓這臺服務器/客戶端系統和運行,我將使用這個系統,我們在這裏對實驗室的一些電梯機型,每個客戶端接收地面指令或「向上」和「下來「電話。服務器將運行分配算法並更新最適合請求訂單的客戶端上的電梯隊列。我意識到這是一條很長的路要走,但我想應該只是在一段時間=)
希望有人有時間來看看這個。提前致謝。
我認爲你應該更清楚你的代碼是如何「不起作用」的。你期待什麼行爲?你觀察到了什麼行爲? 「不起作用」可能意味着任何事情。 – 2012-03-01 17:45:46