2014-09-01 33 views
3

我有一段多線程代碼 - 3個線程輪詢來自SQS的數據並將其添加到python隊列中。 5個線程從python隊列中獲取消息,處理它們並將其發送到後端系統。線程輪詢sqs並將其添加到python隊列以處理模具

下面是代碼:

python_queue = Queue.Queue() 

class GetDataFromSQS(threading.Thread): 
    """Threaded Url Grab""" 
    def __init__(self, python_queue): 
     threading.Thread.__init__(self) 
     self.python_queue = python_queue 

    def run(self): 
     while True: 
      time.sleep(0.5) //sleep for a few secs before querying again 
      try: 
       msgs = sqs_queue.get_messages(10) 
       if msgs == None: 
        print "sqs is empty now"! 
       for msg in msgs: 
        #place each message block from sqs into python queue for processing 
        self.python_queue.put(msg) 
        print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize() 
        #delete from sqs 
        sqs_queue.delete_message(msg) 
      except Exception as e: 
       print "Exception in GetDataFromSQS :: " + e 


class ProcessSQSMsgs(threading.Thread): 
    def __init__(self, python_queue): 
     threading.Thread.__init__(self) 
     self.python_queue = python_queue 
     self.pool_manager = PoolManager(num_pools=6) 

    def run(self): 
     while True: 
      #grabs the message to be parsed from sqs queue 
      python_queue_msg = self.python_queue.get() 
      try: 
       processMsgAndSendToBackend(python_queue_msg, self.pool_manager) 
      except Exception as e: 
       print "Error parsing:: " + e 
      finally: 
       self.python_queue.task_done() 

def processMsgAndSendToBackend(msg, pool_manager): 
    if msg != "": 
     ###### All the code related to processing the msg 
     for individualValue in processedMsg: 
      try: 
       response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue) 
       if response == None: 
        print "Error" 
       else: 
        response.release_conn() 
      except Exception as e: 
       print "Exception! Post data to backend: " + e 


def startMyPython(): 
    #spawn a pool of threads, and pass them queue instance 
    for i in range(3): 
     sqsThread = GetDataFromSQS(python_queue) 
     sqsThread.start() 

    for j in range(5): 
     parseThread = ProcessSQSMsgs(python_queue) 
     #parseThread.setDaemon(True) 
     parseThread.start() 

    #wait on the queue until everything has been processed 
    python_queue.join() 
    # python_queue.close() -- should i do this? 

startMyPython() 

問題: 3蟒蛇工模具隨機(監控使用頂部-p -H)每隔幾天,一切都曾經是正常的,如果我殺進程,再次啓動腳本。我懷疑消失的工作人員是3個GetDataFromSQS線程。而且由於GetDataFromSQS死亡,其他5個工作人員雖然運行時總是處於休眠狀態,因爲python隊列中沒有數據。我不知道我在做什麼錯誤,因爲我對Python非常陌生,並按照本教程創建排隊邏輯和線程 - http://www.ibm.com/developerworks/aix/library/au-threadingpython/

在此先感謝您的幫助。希望我已經明確解釋了我的問題。

回答

4

線程掛起的問題與獲取sqs隊列的句柄有關。我使用IAM管理憑證,並使用boto sdk連接到sqs。

此問題的根本原因是boto軟件包正在從AWS讀取身份驗證的元數據,並且偶爾會失敗一次。

解決的辦法是編輯boto配置文件,增加執行對AWS的身份驗證呼叫的嘗試次數。

[寶途] metadata_service_num_attempts = 5

https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E