线程轮询sqs并将其添加到python队列以处理模具

问题描述:

我有一段多线程代码 - 3个线程轮询来自SQS的数据并将其添加到python队列中。 5个线程从python队列中获取消息,处理它们并将其发送到后端系统。线程轮询sqs并将其添加到python队列以处理模具

下面是代码:

python_queue = Queue.Queue() 

class GetDataFromSQS(threading.Thread): 
    """Threaded Url Grab""" 
    def __init__(self, python_queue): 
     threading.Thread.__init__(self) 
     self.python_queue = python_queue 

    def run(self): 
     while True: 
      time.sleep(0.5) //sleep for a few secs before querying again 
      try: 
       msgs = sqs_queue.get_messages(10) 
       if msgs == None: 
        print "sqs is empty now"! 
       for msg in msgs: 
        #place each message block from sqs into python queue for processing 
        self.python_queue.put(msg) 
        print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize() 
        #delete from sqs 
        sqs_queue.delete_message(msg) 
      except Exception as e: 
       print "Exception in GetDataFromSQS :: " + e 


class ProcessSQSMsgs(threading.Thread): 
    def __init__(self, python_queue): 
     threading.Thread.__init__(self) 
     self.python_queue = python_queue 
     self.pool_manager = PoolManager(num_pools=6) 

    def run(self): 
     while True: 
      #grabs the message to be parsed from sqs queue 
      python_queue_msg = self.python_queue.get() 
      try: 
       processMsgAndSendToBackend(python_queue_msg, self.pool_manager) 
      except Exception as e: 
       print "Error parsing:: " + e 
      finally: 
       self.python_queue.task_done() 

def processMsgAndSendToBackend(msg, pool_manager): 
    if msg != "": 
     ###### All the code related to processing the msg 
     for individualValue in processedMsg: 
      try: 
       response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue) 
       if response == None: 
        print "Error" 
       else: 
        response.release_conn() 
      except Exception as e: 
       print "Exception! Post data to backend: " + e 


def startMyPython(): 
    #spawn a pool of threads, and pass them queue instance 
    for i in range(3): 
     sqsThread = GetDataFromSQS(python_queue) 
     sqsThread.start() 

    for j in range(5): 
     parseThread = ProcessSQSMsgs(python_queue) 
     #parseThread.setDaemon(True) 
     parseThread.start() 

    #wait on the queue until everything has been processed 
    python_queue.join() 
    # python_queue.close() -- should i do this? 

startMyPython() 

问题: 3蟒蛇工模具随机(监控使用顶部-p -H)每隔几天,一切都曾经是正常的,如果我杀进程,再次启动脚本。我怀疑消失的工作人员是3个GetDataFromSQS线程。而且由于GetDataFromSQS死亡,其他5个工作人员虽然运行时总是处于休眠状态,因为python队列中没有数据。我不知道我在做什么错误,因为我对Python非常陌生,并按照本教程创建排队逻辑和线程 - http://www.ibm.com/developerworks/aix/library/au-threadingpython/

在此先感谢您的帮助。希望我已经明确解释了我的问题。

线程挂起的问题与获取sqs队列的句柄有关。我使用IAM管理凭证,并使用boto sdk连接到sqs。

此问题的根本原因是boto软件包正在从AWS读取身份验证的元数据,并且偶尔会失败一次。

解决的办法是编辑boto配置文件,增加执行对AWS的身份验证呼叫的尝试次数。

[宝途] metadata_service_num_attempts = 5

https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E