2013-11-25 28 views
0

我開始拼湊一個簡單的multiprocessing作業管理系統,並且在使用隊列(隊列communications,如下所示)的進程之間發送用戶定義的對象(類Message,如下所示)時遇到了一些困難。你能指出我在正確的方向上獲取使用隊列在進程之間發送的對象(或類似的東西)嗎?隨意提供其他關於代碼中顯示的任何方法的評論。在Python多處理中,如何在使用隊列的進程之間傳遞用戶定義的對象?

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Queue 
import os 
import signal 
import time 

class Message: 
    """ 
    This class acts as a container for process information. The conventions for 
    the various member variables are as follows: 

    - messageType (specification of message type) 
     - "status" 
     - "result" 
     - "error" 
    - sender (name of sender process) 
     - str(os.getpid()) 
     - "janus" 
    - recipient (name of recipient process) 
     - str(os.getpid()) 
     - "all" 
     - "janus" 
    - senderStatus (specification of sender status) 
     - "running" 
     - "halt" 
     - "complete" 
     - "waiting" 
    - controlMessage (control message for recipient process) 
     - "start" 
     - "terminate" 
     - "report" 
     - "pause" 
    - resultObject (some object containing results) 
     - example: tuple 
    - naturalLanguageMessage (natural language message, e.g. human-readable) 
     - human readable string 
    - timeStamp (message timestamp) 
     - time.time() 
    """ 
    messageType="status" 
    sender="unknown" 
    recipient="all" 
    senderStatus="running" 
    controlMessage="null" 
    resultObject="null" 
    naturalLanguageMessage="This is natural language text." 
    timeStamp=time.time() 
    def set(
     self, 
     messageType, 
     sender, 
     recipient, 
     senderStatus, 
     controlMessage, 
     resultObject, 
     naturalLanguageMessage, 
     timeStamp=time.time() 
     ): 
     """ 
     This function sets the values of the member variables all at once. 
     """ 
     self.messageType=messageType 
     self.sender=sender 
     self.recipient=recipient 
     self.senderStatus=senderStatus 
     self.controlMessage=controlMessage 
     self.resultObject=resultObject 
     self.naturalLanguageMessage=naturalLanguageMessage 
     def timeStamp(self): 
      # Update the timestamp in the timestamp member variable. 
      self.timeStamp=time.time() 
     def printout(self): 
      # Print a dictionary of all member variables. 
      #print('-'*80) 
      #print("message content:") 
      printHR(vars(self)) 
      #print('-'*80) 
def printHR(object): 
    """ 
    This function prints a specified object in a human readable way. 
    """ 
    # dictionary 
    if isinstance(object, dict): 
     for key, value in sorted(object.items()): 
      print u'{0}: {1}'.format(key, value) 
    # list or tuple 
    elif isinstance(object, list) or isinstance(object, tuple): 
     for element in object: 
      print element 
    # other 
    else: 
     print object 

def initialise_process(): 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 

def work1(): 
    processID=os.getpid() 
    time.sleep(3) 
    print " work function: work run by process number %d" % (os.getpid()) 
    # prepare message 
    message=Message() 
    message.set(
     "status", 
     str(processID), 
     "janus", 
     "running", 
     "null", 
     "null", 
     "work running" 
    ) 
    # send message 
    communications.put(message) 

def workFunctionTest(testString): 
    processID=os.getpid() 
    print("test string:") 
    print(testString) 
    # prepare message 
    message=Message() 
    message.set(
      "status", 
      str(processID), 
      "janus", 
      "running", 
      "null", 
      "null", 
      "work running") 
    # send message 
    communications.put(message) 
    # do work 
    time.sleep(3) 

def janus(
    workFunction=workFunctionTest, 
    numberOfJobs=1, 
    numberOfProcesses=4 
    ): 
    # printout of multiprocessing specifications 
    print("\nJANUS MULTIPROCESSING JOB SYSTEM\n") 
    print(" multiprocessing specifications:") 
    print(" number of jobs: %s" % (str(numberOfJobs))) 
    print(" number of processes: %s" % (str(numberOfProcesses))) 
    print(" work to complete: %s" % (str(workFunction))) 
    #print(" arguments for work function: " %s (str(workFunctionArguments))) 

    # create process pool 
    print(" initialising process pool...") 
    pool1 = multiprocessing.Pool(numberOfProcesses, initialise_process) 
    print(" pool created: %s" % (str(pool1))) 

    # create message queue for interprocess communications 
    print(" initialising interprocess communications queue...") 
    communications=Queue() 
    print(" queue created: %s" % (str(communications))) 

    # send work to pool 
    print(" applying work to pool...") 
    print(" applying each of %s jobs," % (str(numberOfJobs))) 
    for jobIndex in range(numberOfJobs): 
     print("  applying work function %s as job %s..." 
      % (str(workFunction), jobIndex)) 
     pool1.apply_async(workFunction) 

    # monitor processes 

    # check messages 
    while True: 
     time.sleep(3) 
     if communications.empty() == True: 
      print(" checking for messages... no messages") 
     elif communications.empty() == False: 
      buffer=communications.get() 
     print('-'*80) 
      print("new message:") 
      print buffer 
     print('-'*80) 
      break 
     else: 
      print(" fail") 
    # monitor 
    try: 
     print " jobs running..." 
     time.sleep(10) 
    except KeyboardInterrupt: 
     print " termination command received\nterminating processes..." 
     pool1.terminate() 
     pool1.join() 
    else: 
     print " jobs complete\nterminating..." 
     pool1.close() 
     pool1.join() 

def main(): 
    print('-'*80) 
    janus(work1, 5) 
    print '-'*80 

if __name__ == "__main__": 
    main() 
+0

「難度」是什麼意思?太模糊。它是否引發異常?創建一個損壞的對象?還有別的嗎? –

+0

函數'''janus'''中有一個用來檢查消息的地方;即它檢查是否該隊列「通信」不是空的。儘管試圖通過工作函數'''work1'''將信息保存到隊列中,它似乎總是發現隊列一直是空的。我不知道爲什麼會發生這種情況。 – d3pd

回答

0

退房蟒蛇celery project

芹菜是基於分佈式消息傳遞異步任務隊列/作業隊列。

相關問題