2017-05-25 46 views
1

我有以下代碼:收集期貨從APScheduler

import pandas as pd 
from concurrent.futures import ThreadPoolExecutor, as_completed 
from datetime import datetime 
from apscheduler.schedulers.blocking import BlockingScheduler 


class FutureScheduler(object): 

    def __init__(): 
     self.futures = [] 
     self.scheduler = BlockingScheduler() 
     self.pool = ThreadPoolExecutor(5) 
     self.full_frame = pd.DataFrame() 

    def start(self): 
     job = self.scheduler.add_job(self.add_future, 'cron', day_of_week='mon-fri', hour='8-15', minute='*') 
     self.scheduler.start() 
     self.flush_csvs() 

    def add_future(self): 
     self.futures.append(self.pool.submit(self.long_running_task)) 

    def flush_csvs(self): 
     for future in as_completed(self.futures): 
      results = future.result() 
      self.full_frame = pd.concat((self.full_frame, results)) 
      self.full_frame.to_csv('results.csv') 
      print "flushed... Queue size: %s" % len(self.futures) 

    def long_running_task(self): 
     #takes a while may or may not return before the next one is kicked off 

所以我的問題是,flush_csvs循環的內部代碼永遠不會運行。我是否必須在as_completed被調用之前將所有期貨添加到列表中?有沒有辦法讓BlockingScheduler迴歸未來?我看到它返回一個Job,但在這種情況下,我希望它更像未來。

回答

1

這不起作用,因爲調度程序阻止主線程繼續。這可以防止flush_csvs被執行。

self.scheduler.start() 
self.flush_csvs() 

但是,這可能不是你想要的。 APScheduler在內部使用線程池,因此回調(self.long_running_task)已經在單獨的線程中執行。

如果您需要多個內核(使用ProcessPoolExecutor而不是ThreadPoolExecutor)等,您可以通過APScheduler更改此線程池的配置,具體取決於您需要的工作器數量。您可能還可以將每個作業配置爲做你想做的事。例如,配置每分鐘運行一次的作業的策略以合併(僅運行一個),而不是在延遲情況下背靠背運行多次。

http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s