我通過python的子進程(特別是在AWS EC2上)在Linux機器上啓動進程,該進程會生成大量文件。我需要「tail -f」這些文件,並將每個結果的jsonified輸出發送到它們各自的AWS SQS隊列。我將如何去做這樣的任務?將文件拖入消息隊列
編輯
正如所建議的這個答案,asyncproc,並PEP3145,我可以用下面這樣做:
from asyncproc import Process
import Queue
import os
import time
# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}
def handle_proc(p,q):
latest = p.read()
if latest:
# If nothing new, latest will be an empty string
q.put(latest)
retcode = p.wait(flags=os.WNOHANG)
return retcode
while len(running_procs):
proc_names = running_procs.keys()
for proc_name in proc_names:
proc, q = running_procs[proc_name]
retcode = handle_proc(proc, q)
if retcode is not None: # Process finished.
del running_procs[proc_name]
time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
print(msg_queue.get())
這應該是足夠了,我想,除非其他人可以提供更好的答案。
多個編輯
我的得太多問題。雖然上述方法很好,但我認爲最簡單的解決方案是: - 檢查是否存在文件 - 如果文件存在,請將它們複製到AWS S3上的存儲桶並通過AWS SQS發送文件已複製的消息。重複每60秒 - 消費者應用程序投票SQS,並最終收到文件已被複制的消息 - 消費者應用程序從S3下載文件並用最新的內容替換以前的內容。重複,直到工作完成
儘管子進程中的異步IO的整個問題仍然是一個問題。
您確定不想使用FIFO或FD直接加入進程嗎? – 2011-04-20 01:04:26