我有一段多線程代碼 - 3個線程輪詢來自SQS的數據並將其添加到python隊列中。 5個線程從python隊列中獲取消息,處理它們並將其發送到後端系統。線程輪詢sqs並將其添加到python隊列以處理模具
下面是代碼:
python_queue = Queue.Queue()
class GetDataFromSQS(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
def run(self):
while True:
time.sleep(0.5) //sleep for a few secs before querying again
try:
msgs = sqs_queue.get_messages(10)
if msgs == None:
print "sqs is empty now"!
for msg in msgs:
#place each message block from sqs into python queue for processing
self.python_queue.put(msg)
print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
#delete from sqs
sqs_queue.delete_message(msg)
except Exception as e:
print "Exception in GetDataFromSQS :: " + e
class ProcessSQSMsgs(threading.Thread):
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
self.pool_manager = PoolManager(num_pools=6)
def run(self):
while True:
#grabs the message to be parsed from sqs queue
python_queue_msg = self.python_queue.get()
try:
processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
except Exception as e:
print "Error parsing:: " + e
finally:
self.python_queue.task_done()
def processMsgAndSendToBackend(msg, pool_manager):
if msg != "":
###### All the code related to processing the msg
for individualValue in processedMsg:
try:
response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
if response == None:
print "Error"
else:
response.release_conn()
except Exception as e:
print "Exception! Post data to backend: " + e
def startMyPython():
#spawn a pool of threads, and pass them queue instance
for i in range(3):
sqsThread = GetDataFromSQS(python_queue)
sqsThread.start()
for j in range(5):
parseThread = ProcessSQSMsgs(python_queue)
#parseThread.setDaemon(True)
parseThread.start()
#wait on the queue until everything has been processed
python_queue.join()
# python_queue.close() -- should i do this?
startMyPython()
問題: 3蟒蛇工模具隨機(監控使用頂部-p -H)每隔幾天,一切都曾經是正常的,如果我殺進程,再次啓動腳本。我懷疑消失的工作人員是3個GetDataFromSQS線程。而且由於GetDataFromSQS死亡,其他5個工作人員雖然運行時總是處於休眠狀態,因爲python隊列中沒有數據。我不知道我在做什麼錯誤,因爲我對Python非常陌生,並按照本教程創建排隊邏輯和線程 - http://www.ibm.com/developerworks/aix/library/au-threadingpython/
在此先感謝您的幫助。希望我已經明確解釋了我的問題。