2016-11-11 15 views
3

我正在閱讀來自Twitter Streaming API的推文。連接到API後,我得到一個生成器。如何在某個特定時間退出發電機?

我正在循環播放收到的每條推文,但是我想在18PM退出迭代器。收到每條推文後,我會檢查它是否晚於指定的時間戳並停止。

問題是我沒有經常收到推文。所以,我可以在17:50收到一個,下一個在19PM。那時候我會發現時間已過,我需要停下來。

有沒有辦法在18PM時強制停止?

這裏是我的代碼的高級視圖:

def getStream(tweet_iter): 
    for tweet in tweet_iter: 
     #do stuff 
     if time_has_passed(): 
      return 

tweet_iter = ConnectAndGetStream() 
getStream(tweet_iter) 
+2

注意:遵循PEP 8(getStream應該是get_stream,正式推薦)是個好主意。 – EOL

+0

爲什麼你的腳本不能在六點時停止運行? – jonrsharpe

+2

我猜從tweet生成器中獲得yieldvalue的時間是動態的,所以您必須在某種超時時間內包裝下一個() - 調用以騰出空間來檢查它是什麼時間。請參閱http://stackoverflow.com/questions/492519/timeout-on-a-function-call – Moberg

回答

1

你的問題可以通過分割設計的功能被分解爲兩個獨立的過程:

  1. Twitter的過程中充當封裝到Twitter API和
  2. 一個監視器進程,當到達退出時間時能夠終止twitter進程。

下面的一段代碼原型以上使用Python的多處理模塊所描述的功能:

import multiprocessing as mp 
import time 

EXIT_TIME = '12:21' #'18:00' 

def twitter(): 

    while True: 
     print 'Twittttttttttt.....' 
     time.sleep(5) 

def get_time(): 

    return time.ctime().split()[3][:5] 

if __name__ == '__main__': 

    # Execute the function as a process 
    p = mp.Process(target=twitter, args=()) 
    p.start() 

    # Monitoring the process p 
    while True: 
     print 'Checking the hour...' 
     if get_time() == EXIT_TIME: 
      p.terminate() 
      print 'Current time:', time.ctime() 
      print 'twitter process has benn terminated...' 
      break 
     time.sleep(5) 

當然,你可以使用p.join(TIMEOUT),而不是使用以所呈現的,而真正的循環我的示例如here所示。

+0

謝謝。你的原型似乎工作完美,但我面臨一些問題。我將一些參數傳遞給'twitter'函數。其中,我傳遞一個記錄器對象,我得到這個錯誤消息「TypeError:不能pickle thread.lock對象」。你知道這件事嗎? – Stergios

+0

請查看以下帖子:http://stackoverflow.com/a/7865512/2194843 包含您遇到的錯誤類型的解決方法。 – funk

1

這裏是與線程和調度器蟒一個例子:

import threading 
import time 
import os 
import schedule 

def theKillingJob(): 
    print("Kenny and Cartman die!") 
    os._exit(1) 

schedule.every().day.at("18:00").do(theKillingJob,'It is 18:00') 

def getStream(tweet_iter): 
    for tweet in tweet_iter: 
     #do stuff 

def kenny(): 
    while True: 
     print("Kenny alive..") 
     schedule.run_pending() 
     time.sleep(1) 

def cartman(): 
    while True: 
     print("Cartman alive..") 

     tweet_iter = ConnectAndGetStream() 
     getStream(tweet_iter) 

     # You can change whenever you want to check for tweets by changing sleep time here 
     time.sleep(1) 

if __name__ == '__main__': 
    daemon_kenny = threading.Thread(name='kenny', target=kenny) 
    daemon_cartman = threading.Thread(name='cartman', target=cartman) 
    daemon_kenny.setDaemon(True) 
    daemon_cartman.setDaemon(True) 

    daemon_kenny.start() 
    daemon_cartman.start() 
    daemon_kenny.join() 
    daemon_cartman.join() 
1

創建用於生產一個單獨的線程,並使用Queue進行通信。我還必須使用threading.Event來阻止製片人。

import itertools, queue, threading, time 

END_TIME = time.time() + 5 # run for ~5 seconds 

def time_left(): 
    return END_TIME - time.time() 

def ConnectAndGetStream():    # stub for the real thing 
    for i in itertools.count(): 
     time.sleep(1) 
     yield "tweet {}".format(i) 

def producer(tweets_queue, the_end): # producer 
    it = ConnectAndGetStream() 
    while not the_end.is_set(): 
     tweets_queue.put(next(it)) 

def getStream(tweets_queue, the_end): # consumer 
    try: 
     while True: 
      tweet = tweets_queue.get(timeout=time_left()) 
      print('Got', tweet) 
    except queue.Empty: 
     print('THE END') 
     the_end.set() 

tweets_queue = queue.Queue() # you might wanna use the maxsize parameter 
the_end = threading.Event() 
producer_thread = threading.Thread(target=producer, 
            args=(tweets_queue, the_end)) 
producer_thread.start() 
getStream(tweets_queue, the_end) 
producer_thread.join() 
相關問題