2010-07-05 22 views
2

我正在以極快的速度從Twitter API Streaming Server的長時間連接中獲得推文。我繼續做一些沉重的文本處理並將推文保存在我的數據庫中。使用扭曲或使用線程處理大容量流數據,使用Python中的隊列

我正在使用PyCurl的連接和回調函數,關心文本處理和保存在數據庫中。看看下面我的方法誰不能正常工作。

我不熟悉網絡編程,所以想知道: 如何使用線程,隊列或扭曲框架來解決這個問題?

def process_tweet(): 
    # do some heaving text processing 


def open_stream_connection(): 
    connect = pycurl.Curl() 
    connect.setopt(pycurl.URL, STREAMURL) 
    connect.setopt(pycurl.WRITEFUNCTION, process_tweet) 
    connect.setopt(pycurl.USERPWD, "%s:%s" % (TWITTER_USER, TWITTER_PASS)) 
    connect.perform() 
+1

定義每秒消息中「非常快」,並詳細說明「重處理」。 – MattH 2010-07-05 15:24:48

+0

也定義「不能正常工作」 – nosklo 2010-07-05 16:45:10

+0

Upvoted爲用戶名。對不起,忍不住:P – 2010-07-05 18:56:14

回答

0

這是簡單的設置,如果你可以使用單臺機器。

1個線程接受連接。接受連接後,它將接受的連接傳遞給另一個線程進行處理。

當然,您可以使用進程(例如使用multiprocessing)而不是線程,但我不熟悉multiprocessing以提供建議。設置將是相同的:1進程接受連接,然後將它們傳遞給子進程。

如果你需要在多臺機器上分割處理,那麼簡單的事情就是將消息填充到數據庫中,然後通知工作人員新記錄(這需要某種協調/鎖定工人)。如果你想避免碰到數據庫,那麼你將不得不從你的網絡過程向工作者傳遞消息(並且我在低級聯網方面不夠精通,告訴你如何做到這一點:))

0

我建議這個組織:

  • 一個進程讀的Twitter,填料鳴叫到數據庫
  • 一個或多個進程讀取數據庫,每個處理,插入到新的數據庫。原始推文被刪除或標記爲已處理。

也就是說,你有兩個更多的進程/線程。推特數據庫可以被看作是一個工作隊列。多個工作進程將作業(微博)從隊列中取出,並在第二個數據庫中創建數據。

+2

數據庫看起來像是過度殺傷性的臨時容器。 – Oddthinking 2010-07-18 15:30:53

+0

同意。可能更好地使用@Oddthinking建議的MT隊列架構 – Sid 2016-03-21 16:06:06

1

你應該有多個線程在收到消息時收到這些消息。如果你使用的是pycurl,那麼這個數字應該是1,但是如果你使用的是httplib,那麼這個數字應該會更高 - 這個想法是你希望能夠有更多的一次在Twitter API上進行一次查詢,因此需要處理大量的工作。

當每個鳴叫到達時,它被推送到Queue.Queue。隊列確保通信中存在線程安全 - 每條推文只能由一個工作線程處理。

工作線程池負責從隊列讀取並處理Tweet。只有有趣的推文應該被添加到數據庫中。

由於數據庫可能是瓶頸,池中線程的數量有限制,值得添加 - 更多線程不會使其處理速度更快,這只是意味着更多的線程正在等待隊列來訪問數據庫。

這是一個相當常見的Python成語。這種體系結構只能在一定程度上擴展 - 即一臺機器可以處理的內容。