2011-04-20 65 views
2

我通過python的子進程(特別是在AWS EC2上)在Linux機器上啓動進程,該進程會生成大量文件。我需要「tail -f」這些文件,並將每個結果的jsonified輸出發送到它們各自的AWS SQS隊列。我將如何去做這樣的任務?將文件拖入消息隊列

編輯

正如所建議的這個答案,asyncproc,並PEP3145,我可以用下面這樣做:

from asyncproc import Process 
import Queue 
import os 
import time 

# Substitute AWS SQS for Queue 
sta_queue = Queue.Queue() 
msg_queue = Queue.Queue() 
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)} 

def handle_proc(p,q): 
    latest = p.read() 
    if latest: 
     # If nothing new, latest will be an empty string 
     q.put(latest) 
    retcode = p.wait(flags=os.WNOHANG) 
    return retcode 

while len(running_procs): 
    proc_names = running_procs.keys() 
    for proc_name in proc_names: 
     proc, q = running_procs[proc_name] 
     retcode = handle_proc(proc, q) 
     if retcode is not None: # Process finished. 
      del running_procs[proc_name] 
    time.sleep(1.0) 
print("Status queue") 
while not sta_queue.empty(): 
    print(sta_queue.get()) 
print("Message queue") 
while not msg_queue.empty(): 
    print(msg_queue.get()) 

這應該是足夠了,我想,除非其他人可以提供更好的答案。

多個編輯

我的得太多問題。雖然上述方法很好,但我認爲最簡單的解決方案是: - 檢查是否存在文件 - 如果文件存在,請將它們複製到AWS S3上的存儲桶並通過AWS SQS發送文件已複製的消息。重複每60秒 - 消費者應用程序投票SQS,並最終收到文件已被複制的消息 - 消費者應用程序從S3下載文件並用最新的內容替換以前的內容。重複,直到工作完成

儘管子進程中的異步IO的整個問題仍然是一個問題。

+0

您確定不想使用FIFO或FD直接加入進程嗎? – 2011-04-20 01:04:26

回答

0

您可以使用subprocess.Popen類運行尾部並讀取其輸出。

try: 
    process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE) 
except (OSError, ValueError): 
    pass # TODO: handle errors 
output = process.stdout.read() 

subprocess.check_output函數以單行提供此功能。這是Python 2.7版本中的新功能。

try: 
    output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE) 
except CalledProcessError: 
    pass # TODO: handle errors 

對於非阻塞I/O,請參見this question

+0

會不會阻止? – user90855 2011-04-20 13:23:53

+0

是的,這個例子會阻塞。但是,它可以在不使用subprocess.Popen阻塞的情況下完成。例如,Popen.poll()讓你知道進程是否已經終止而沒有阻塞。如果在進程終止後執行讀取操作,則不會阻塞。 – 2011-04-20 13:55:34

+0

確實如此,但是整個觀點是「tail -f」部分,它在主進程完成(幾天後)完成後纔會終止,直到實例關閉。 – user90855 2011-04-20 17:20:20